diff options
Diffstat (limited to 'please')
-rwxr-xr-x | please | 152 |
1 files changed, 152 insertions, 0 deletions
@@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +""" +Helper script for running Sandcrawler (journal pipeline) tasks in production. + +This is basically a Makefile. Be sure to only use python3 standard library +modules, so there are no dependencies. +""" + +import sys +import argparse +import subprocess +from datetime import datetime + +HDFS_DIR = "hdfs:///user/bnewbold/sandcrawler" +HBASE_HOST = "wbgrp-svc263.us.archive.org" +GROBID_URI = "http://wbgrp-svc096.us.archive.org:8070" + +def rebuild_python(): + print("Rebuilding python venv...") + cmd = """cd mapreduce; + export PIPENV_VENV_IN_PROJECT=1; + pipenv install --deploy + tar -czf venv-current.tar.gz -C .venv .""" + subprocess.call(cmd, shell=True) + +def rebuild_scalding(): + print("Rebuilding scalding jar...") + cmd = """cd scalding; sbt assembly""" + subprocess.call(cmd, shell=True) + +def run_backfill(args): + if args.rebuild: + rebuild_python() + print("Starting backfill job...") + output = "{}/output-{}/{}-backfill".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """cd mapreduce; + pipenv run ./backfill_hbase_from_cdx.py \ + --hbase-host {hbase_host} \ + --hbase-table wbgrp-journal-extract-0-{env} \ + -r hadoop \ + -c mrjob.conf \ + --archive venv-current.tar.gz#venv \ + {input_cdx} + """.format(hbase_host=HBASE_HOST, env=args.env, + input_cdx=args.input_cdx) + subprocess.call(cmd, shell=True) + +def run_extract(args): + if args.rebuild: + rebuild_python() + print("Starting extract job...") + output = "{}/output-{}/{}-extract".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """cd mapreduce; + pipenv run ./extraction_cdx_grobid.py \ + --hbase-host {hbase_host} \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --grobid-uri {grobid_uri} \ + -r hadoop \ + -c mrjob.conf \ + --archive venv-current.tar.gz#venv \ + --jobconf mapred.line.input.format.linespermap=8000 \ + --jobconf mapreduce.job.queuename=extraction \ + --jobconf mapred.task.timeout=3600000 \ + {input_cdx} + """.format(hbase_host=HBASE_HOST, env=args.env, + input_cdx=args.input_cdx, + grobid_uri=GROBID_URI) + subprocess.call(cmd, shell=True) + +def run_rowcount(args): + if args.rebuild: + rebuild_scalding() + print("Starting rowcount job...") + output = "{}/output-{}/{}-rowcount".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --output {}""".format(output) + subprocess.call(cmd, shell=True) + +def run_statuscount(args): + if args.rebuild: + rebuild_scalding() + print("Starting statuscount job...") + output = "{}/output-{}/{}-statuscount".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseStatusCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --output {}""".format(output) + subprocess.call(cmd, shell=True) + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument('--prod', + help="run against prod HBase table", + action='store_true') + parser.add_argument('--qa', + help="run against qa HBase table", + action='store_true') + parser.add_argument('--rebuild', + help="rebuild whatever artifact gets sent", + action='store_true') + subparsers = parser.add_subparsers() + + sub_backfill = subparsers.add_parser('backfill') + sub_backfill.set_defaults(func=run_backfill) + sub_backfill.add_argument('input_cdx', + help="full HDFS path of CDX file to backfill") + + sub_extract = subparsers.add_parser('extract') + sub_extract.set_defaults(func=run_extract) + sub_extract.add_argument('input_cdx', + help="full HDFS path of CDX file to extract") + + sub_rowcount = subparsers.add_parser('row-count') + sub_rowcount.set_defaults(func=run_rowcount) + + sub_statuscount = subparsers.add_parser('status-count') + sub_statuscount.set_defaults(func=run_statuscount) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do! (try --help)") + sys.exit(-1) + if not (args.prod or args.qa) or (args.prod and args.qa): + print("must pass one of --prod or --qa") + if args.prod: + args.env = "prod" + if args.qa: + args.env = "qa" + + args.func(args) + +if __name__ == '__main__': + main() |