From cb563e890a8544fef1367d7b0f556cda6f0daca0 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 24 Aug 2018 18:03:13 -0700 Subject: please support for DumpUnGrobidedJob --- please | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'please') diff --git a/please b/please index c888bbc..0c69bce 100755 --- a/please +++ b/please @@ -257,6 +257,27 @@ def run_keysmissingcol(args): env=args.env) subprocess.call(cmd, shell=True) +def run_dumpungrobided(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpungrobided job...") + output = "{}/output-{}/{}-dumpungrobided".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpUnGrobidedJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def main(): parser = argparse.ArgumentParser() @@ -320,6 +341,9 @@ def main(): sub_keysmissingcol.add_argument('column', help="column to SCAN for missing keys") + sub_dumpungrobided = subparsers.add_parser('dump-ungrobided') + sub_dumpungrobided.set_defaults(func=run_dumpungrobided) + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do! (try --help)") -- cgit v1.2.3 From c56efb06923752ff7425e27f31aaeb25f766a5ed Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sat, 25 Aug 2018 21:16:50 +0000 Subject: add extraction_ungrobided support to please --- please | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) (limited to 'please') diff --git a/please b/please index 0c69bce..81aad4d 100755 --- a/please +++ b/please @@ -74,6 +74,31 @@ def run_extract(args): grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) +def run_extract_ungrobided(args): + if args.rebuild: + rebuild_python() + print("Starting extractungrobided job...") + output = "{}/output-{}/{}-extract-ungrobided".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """cd python; + pipenv run ./extraction_ungrobided.py \ + --hbase-host {hbase_host} \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --grobid-uri {grobid_uri} \ + -r hadoop \ + -c mrjob.conf \ + --archive venv-current.tar.gz#venv \ + --jobconf mapred.line.input.format.linespermap=8000 \ + --jobconf mapreduce.job.queuename=extraction \ + --jobconf mapred.task.timeout=3600000 \ + {input_ungrobided} + """.format(hbase_host=HBASE_HOST, env=args.env, + input_ungrobided=args.input_ungrobided, + grobid_uri=GROBID_URI) + subprocess.call(cmd, shell=True) + def run_rowcount(args): if args.rebuild: rebuild_scalding() @@ -302,6 +327,11 @@ def main(): sub_extract.add_argument('input_cdx', help="full HDFS path of CDX file to extract") + sub_extractungrobided = subparsers.add_parser('extract-ungrobided') + sub_extractungrobided.set_defaults(func=run_extract_ungrobided) + sub_extractungrobided.add_argument('input_ungrobided', + help="full HDFS path of 'ungrobided' file to extract") + sub_rowcount = subparsers.add_parser('row-count') sub_rowcount.set_defaults(func=run_rowcount) -- cgit v1.2.3 From a71d556763b4031bfa0e56abc72348d7f1d3d966 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 26 Aug 2018 05:15:21 +0000 Subject: please: save extraction output --- please | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'please') diff --git a/please b/please index 81aad4d..a2658ab 100755 --- a/please +++ b/please @@ -64,12 +64,15 @@ def run_extract(args): --grobid-uri {grobid_uri} \ -r hadoop \ -c mrjob.conf \ + --output-dir {output} \ + --no-output \ --archive venv-current.tar.gz#venv \ --jobconf mapred.line.input.format.linespermap=8000 \ --jobconf mapreduce.job.queuename=extraction \ --jobconf mapred.task.timeout=3600000 \ {input_cdx} """.format(hbase_host=HBASE_HOST, env=args.env, + output=output, input_cdx=args.input_cdx, grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) @@ -89,6 +92,8 @@ def run_extract_ungrobided(args): --grobid-uri {grobid_uri} \ -r hadoop \ -c mrjob.conf \ + --output-dir {output} \ + --no-output \ --archive venv-current.tar.gz#venv \ --jobconf mapred.line.input.format.linespermap=8000 \ --jobconf mapreduce.job.queuename=extraction \ @@ -96,6 +101,7 @@ def run_extract_ungrobided(args): {input_ungrobided} """.format(hbase_host=HBASE_HOST, env=args.env, input_ungrobided=args.input_ungrobided, + output=output, grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) -- cgit v1.2.3