diff options
-rwxr-xr-x | please | 31 |
1 files changed, 31 insertions, 0 deletions
@@ -13,6 +13,7 @@ from datetime import datetime HDFS_DIR = "hdfs:///user/bnewbold/sandcrawler" HBASE_HOST = "wbgrp-svc263.us.archive.org" +GROBID_URI = "http://wbgrp-svc096.us.archive.org:8070" def rebuild_python(): print("Rebuilding python venv...") @@ -47,6 +48,31 @@ def run_backfill(args): input_cdx=args.input_cdx) subprocess.call(cmd, shell=True) +def run_extract(args): + if args.rebuild: + rebuild_python() + print("Starting extract job...") + output = "{}/output-{}/{}-extract".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """cd mapreduce; + pipenv run ./extraction_cdx_grobid.py \ + --hbase-host {hbase_host} \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --grobid-uri {grobid_uri} \ + -r hadoop \ + -c mrjob.conf \ + --archive venv-current.tar.gz#venv \ + --jobconf mapred.line.input.format.linespermap=8000 \ + --jobconf mapreduce.job.queuename=extraction \ + --jobconf mapred.task.timeout=3600000 \ + {input_cdx} + """.format(hbase_host=HBASE_HOST, env=args.env, + input_cdx=args.input_cdx, + grobid_uri=GROBID_URI) + subprocess.call(cmd, shell=True) + def run_rowcount(args): print("Starting rowcount job...") output = "{}/output-{}/{}-rowcount".format( @@ -80,6 +106,11 @@ def main(): sub_backfill.add_argument('input_cdx', help="full HDFS path of CDX file to backfill") + sub_extract = subparsers.add_parser('extract') + sub_extract.set_defaults(func=run_extract) + sub_extract.add_argument('input_cdx', + help="full HDFS path of CDX file to extract") + sub_rowcount = subparsers.add_parser('row-count') sub_rowcount.set_defaults(func=run_rowcount) |