aboutsummaryrefslogtreecommitdiffstats
path: root/please
diff options
context:
space:
mode:
Diffstat (limited to 'please')
-rwxr-xr-xplease152
1 files changed, 152 insertions, 0 deletions
diff --git a/please b/please
new file mode 100755
index 0000000..2d4cae8
--- /dev/null
+++ b/please
@@ -0,0 +1,152 @@
+#!/usr/bin/env python3
+"""
+Helper script for running Sandcrawler (journal pipeline) tasks in production.
+
+This is basically a Makefile. Be sure to only use python3 standard library
+modules, so there are no dependencies.
+"""
+
+import sys
+import argparse
+import subprocess
+from datetime import datetime
+
+HDFS_DIR = "hdfs:///user/bnewbold/sandcrawler"
+HBASE_HOST = "wbgrp-svc263.us.archive.org"
+GROBID_URI = "http://wbgrp-svc096.us.archive.org:8070"
+
+def rebuild_python():
+ print("Rebuilding python venv...")
+ cmd = """cd mapreduce;
+ export PIPENV_VENV_IN_PROJECT=1;
+ pipenv install --deploy
+ tar -czf venv-current.tar.gz -C .venv ."""
+ subprocess.call(cmd, shell=True)
+
+def rebuild_scalding():
+ print("Rebuilding scalding jar...")
+ cmd = """cd scalding; sbt assembly"""
+ subprocess.call(cmd, shell=True)
+
+def run_backfill(args):
+ if args.rebuild:
+ rebuild_python()
+ print("Starting backfill job...")
+ output = "{}/output-{}/{}-backfill".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """cd mapreduce;
+ pipenv run ./backfill_hbase_from_cdx.py \
+ --hbase-host {hbase_host} \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ -r hadoop \
+ -c mrjob.conf \
+ --archive venv-current.tar.gz#venv \
+ {input_cdx}
+ """.format(hbase_host=HBASE_HOST, env=args.env,
+ input_cdx=args.input_cdx)
+ subprocess.call(cmd, shell=True)
+
+def run_extract(args):
+ if args.rebuild:
+ rebuild_python()
+ print("Starting extract job...")
+ output = "{}/output-{}/{}-extract".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """cd mapreduce;
+ pipenv run ./extraction_cdx_grobid.py \
+ --hbase-host {hbase_host} \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --grobid-uri {grobid_uri} \
+ -r hadoop \
+ -c mrjob.conf \
+ --archive venv-current.tar.gz#venv \
+ --jobconf mapred.line.input.format.linespermap=8000 \
+ --jobconf mapreduce.job.queuename=extraction \
+ --jobconf mapred.task.timeout=3600000 \
+ {input_cdx}
+ """.format(hbase_host=HBASE_HOST, env=args.env,
+ input_cdx=args.input_cdx,
+ grobid_uri=GROBID_URI)
+ subprocess.call(cmd, shell=True)
+
+def run_rowcount(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting rowcount job...")
+ output = "{}/output-{}/{}-rowcount".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --output {}""".format(output)
+ subprocess.call(cmd, shell=True)
+
+def run_statuscount(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting statuscount job...")
+ output = "{}/output-{}/{}-statuscount".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.HBaseStatusCountJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --output {}""".format(output)
+ subprocess.call(cmd, shell=True)
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('--prod',
+ help="run against prod HBase table",
+ action='store_true')
+ parser.add_argument('--qa',
+ help="run against qa HBase table",
+ action='store_true')
+ parser.add_argument('--rebuild',
+ help="rebuild whatever artifact gets sent",
+ action='store_true')
+ subparsers = parser.add_subparsers()
+
+ sub_backfill = subparsers.add_parser('backfill')
+ sub_backfill.set_defaults(func=run_backfill)
+ sub_backfill.add_argument('input_cdx',
+ help="full HDFS path of CDX file to backfill")
+
+ sub_extract = subparsers.add_parser('extract')
+ sub_extract.set_defaults(func=run_extract)
+ sub_extract.add_argument('input_cdx',
+ help="full HDFS path of CDX file to extract")
+
+ sub_rowcount = subparsers.add_parser('row-count')
+ sub_rowcount.set_defaults(func=run_rowcount)
+
+ sub_statuscount = subparsers.add_parser('status-count')
+ sub_statuscount.set_defaults(func=run_statuscount)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do! (try --help)")
+ sys.exit(-1)
+ if not (args.prod or args.qa) or (args.prod and args.qa):
+ print("must pass one of --prod or --qa")
+ if args.prod:
+ args.env = "prod"
+ if args.qa:
+ args.env = "qa"
+
+ args.func(args)
+
+if __name__ == '__main__':
+ main()