aboutsummaryrefslogtreecommitdiffstats
path: root/please
diff options
context:
space:
mode:
Diffstat (limited to 'please')
-rwxr-xr-xplease60
1 files changed, 60 insertions, 0 deletions
diff --git a/please b/please
index c888bbc..a2658ab 100755
--- a/please
+++ b/please
@@ -64,16 +64,47 @@ def run_extract(args):
--grobid-uri {grobid_uri} \
-r hadoop \
-c mrjob.conf \
+ --output-dir {output} \
+ --no-output \
--archive venv-current.tar.gz#venv \
--jobconf mapred.line.input.format.linespermap=8000 \
--jobconf mapreduce.job.queuename=extraction \
--jobconf mapred.task.timeout=3600000 \
{input_cdx}
""".format(hbase_host=HBASE_HOST, env=args.env,
+ output=output,
input_cdx=args.input_cdx,
grobid_uri=GROBID_URI)
subprocess.call(cmd, shell=True)
+def run_extract_ungrobided(args):
+ if args.rebuild:
+ rebuild_python()
+ print("Starting extractungrobided job...")
+ output = "{}/output-{}/{}-extract-ungrobided".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """cd python;
+ pipenv run ./extraction_ungrobided.py \
+ --hbase-host {hbase_host} \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --grobid-uri {grobid_uri} \
+ -r hadoop \
+ -c mrjob.conf \
+ --output-dir {output} \
+ --no-output \
+ --archive venv-current.tar.gz#venv \
+ --jobconf mapred.line.input.format.linespermap=8000 \
+ --jobconf mapreduce.job.queuename=extraction \
+ --jobconf mapred.task.timeout=3600000 \
+ {input_ungrobided}
+ """.format(hbase_host=HBASE_HOST, env=args.env,
+ input_ungrobided=args.input_ungrobided,
+ output=output,
+ grobid_uri=GROBID_URI)
+ subprocess.call(cmd, shell=True)
+
def run_rowcount(args):
if args.rebuild:
rebuild_scalding()
@@ -257,6 +288,27 @@ def run_keysmissingcol(args):
env=args.env)
subprocess.call(cmd, shell=True)
+def run_dumpungrobided(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting dumpungrobided job...")
+ output = "{}/output-{}/{}-dumpungrobided".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.DumpUnGrobidedJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def main():
parser = argparse.ArgumentParser()
@@ -281,6 +333,11 @@ def main():
sub_extract.add_argument('input_cdx',
help="full HDFS path of CDX file to extract")
+ sub_extractungrobided = subparsers.add_parser('extract-ungrobided')
+ sub_extractungrobided.set_defaults(func=run_extract_ungrobided)
+ sub_extractungrobided.add_argument('input_ungrobided',
+ help="full HDFS path of 'ungrobided' file to extract")
+
sub_rowcount = subparsers.add_parser('row-count')
sub_rowcount.set_defaults(func=run_rowcount)
@@ -320,6 +377,9 @@ def main():
sub_keysmissingcol.add_argument('column',
help="column to SCAN for missing keys")
+ sub_dumpungrobided = subparsers.add_parser('dump-ungrobided')
+ sub_dumpungrobided.set_defaults(func=run_dumpungrobided)
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do! (try --help)")