diff options
Diffstat (limited to 'please')
-rwxr-xr-x | please | 467 |
1 files changed, 460 insertions, 7 deletions
@@ -2,8 +2,8 @@ """ Helper script for running Sandcrawler (journal pipeline) tasks in production. -This is basically a Makefile. Be sure to only use python3 standard library -modules, so there are no dependencies. +This is basically a Makefile. If you edit this file, be sure to only use +python3 standard library modules, so there are no dependencies. """ import sys @@ -16,9 +16,13 @@ HBASE_HOST = "wbgrp-svc263.us.archive.org" ZOOKEEPER_HOSTS = "mtrcs-zk1.us.archive.org:2181" GROBID_URI = "http://wbgrp-svc096.us.archive.org:8070" +# Staging config +#HBASE_HOST = "wbgrp-svc312.us.archive.org" +#ZOOKEEPER_HOSTS = "wbgrp-svc312.us.archive.org:2181" + def rebuild_python(): print("Rebuilding python venv...") - cmd = """cd mapreduce; + cmd = """cd python_hadoop; export PIPENV_VENV_IN_PROJECT=1; pipenv install --deploy tar -czf venv-current.tar.gz -C .venv .""" @@ -37,7 +41,7 @@ def run_backfill(args): HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) - cmd = """cd mapreduce; + cmd = """cd python_hadoop; pipenv run ./backfill_hbase_from_cdx.py \ --hbase-host {hbase_host} \ --hbase-table wbgrp-journal-extract-0-{env} \ @@ -57,23 +61,54 @@ def run_extract(args): HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) - cmd = """cd mapreduce; + cmd = """cd python_hadoop; pipenv run ./extraction_cdx_grobid.py \ --hbase-host {hbase_host} \ --hbase-table wbgrp-journal-extract-0-{env} \ --grobid-uri {grobid_uri} \ -r hadoop \ -c mrjob.conf \ + --output-dir {output} \ + --no-output \ --archive venv-current.tar.gz#venv \ --jobconf mapred.line.input.format.linespermap=8000 \ --jobconf mapreduce.job.queuename=extraction \ --jobconf mapred.task.timeout=3600000 \ {input_cdx} """.format(hbase_host=HBASE_HOST, env=args.env, + output=output, input_cdx=args.input_cdx, grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) +def run_extract_ungrobided(args): + if args.rebuild: + rebuild_python() + print("Starting extractungrobided job...") + output = "{}/output-{}/{}-extract-ungrobided".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """cd python_hadoop; + pipenv run ./extraction_ungrobided.py \ + --hbase-host {hbase_host} \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --grobid-uri {grobid_uri} \ + -r hadoop \ + -c mrjob.conf \ + --output-dir {output} \ + --no-output \ + --archive venv-current.tar.gz#venv \ + --jobconf mapred.line.input.format.linespermap=8000 \ + --jobconf mapreduce.job.queuename=extraction \ + --jobconf mapred.task.timeout=3600000 \ + {input_ungrobided} + """.format(hbase_host=HBASE_HOST, env=args.env, + input_ungrobided=args.input_ungrobided, + output=output, + grobid_uri=GROBID_URI) + subprocess.call(cmd, shell=True) + def run_rowcount(args): if args.rebuild: rebuild_scalding() @@ -95,6 +130,27 @@ def run_rowcount(args): env=args.env) subprocess.call(cmd, shell=True) +def run_statuscodecount(args): + if args.rebuild: + rebuild_scalding() + print("Starting statuscodecount job...") + output = "{}/output-{}/{}-statuscodecount".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseStatusCodeCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def run_statuscount(args): if args.rebuild: rebuild_scalding() @@ -105,7 +161,322 @@ def run_statuscount(args): datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ - com.twitter.scalding.Tool sandcrawler.HBaseStatusCountJob \ + com.twitter.scalding.Tool \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.HBaseStatusCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_matchcrossref(args): + if args.rebuild: + rebuild_scalding() + print("Starting matchcrossref job...") + output = "{}/output-{}/{}-matchcrossref".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + if args.fatcat_insertable: + jobclass = "ScoreInsertableJob" + else: + jobclass = "ScoreJob" + # Notes: -D options must come after Tool but before class name + # https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-pass-parameters-to-my-hadoop-job-number-of-reducers--memory-options--etc- + # Compression: changed due to errors in production + # https://stackoverflow.com/a/11336820/4682349 + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --crossref-input {crossref_input} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + crossref_input=args.crossref_input) + subprocess.call(cmd, shell=True) + +def run_groupworks(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworks job...") + output = "{}/output-{}/{}-groupworks".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input=args.fatcat_release_input) + subprocess.call(cmd, shell=True) + +def run_groupworkssubset(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworkssubset job...") + output = "{}/output-{}/{}-groupworkssubset".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksSubsetJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input_left} \ + --fatcat-release-input-right {fatcat_release_input_right} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input_left=args.fatcat_release_input_left, + fatcat_release_input_right=args.fatcat_release_input_right) + subprocess.call(cmd, shell=True) + +def run_grobidscorabledump(args): + if args.rebuild: + rebuild_scalding() + print("Starting grobid-scorable-dump job...") + output = "{}/output-{}/{}-grobidscorabledump".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.GrobidScorableDumpJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_dumpfilemeta(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpfilemeta job...") + output = "{}/output-{}/{}-dumpfilemeta".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpFileMetaJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_dumpgrobidstatuscode(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpgrobidstatuscode job...") + output = "{}/output-{}/{}-dumpgrobidstatuscode".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpGrobidStatusCodeJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_dumpgrobidmetainsertable(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpgrobidmetainsertable job...") + output = "{}/output-{}/{}-dumpgrobidmetainsertable".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpGrobidMetaInsertableJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_dumpgrobidxml(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpgrobidxml job...") + output = "{}/output-{}/{}-dumpgrobidxml".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpGrobidXmlJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_colcount(args): + if args.rebuild: + rebuild_scalding() + print("Starting colcount job...") + output = "{}/output-{}/{}-colcount-{}".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"), + args.column.replace(':', '_')) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseColCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --column {column} \ + --output {output}""".format( + column=args.column, + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_matchbenchmark(args): + if args.rebuild: + rebuild_scalding() + print("Starting matchbenchmark job...") + cmd = """./pig/deps/hadoop/bin/hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + sandcrawler.MatchBenchmarkJob \ + --local \ + --app.conf.path scalding/ia_cluster.conf \ + --left-bibjson {left_bibjson} \ + --right-bibjson {right_bibjson} \ + --output {output}""".format( + output=args.output, + left_bibjson=args.left_bibjson, + right_bibjson=args.right_bibjson) + subprocess.call(cmd, shell=True) + +def run_groupworksbenchmark(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworksbenchmark job...") + cmd = """./pig/deps/hadoop/bin/hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + sandcrawler.GroupFatcatWorksJob \ + --local \ + --app.conf.path scalding/ia_cluster.conf \ + --fatcat-release-input {fatcat_releases} \ + --output {output}""".format( + output=args.output, + fatcat_releases=args.fatcat_releases) + subprocess.call(cmd, shell=True) + +def run_keysmissingcol(args): + if args.rebuild: + rebuild_scalding() + print("Starting keysmissingcol job...") + output = "{}/output-{}/{}-keysmissingcol-{}".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"), + args.column.replace(":", "_")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.MissingColumnDumpJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --column {column} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + column=args.column, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_dumpungrobided(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpungrobided job...") + output = "{}/output-{}/{}-dumpungrobided".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpUnGrobidedJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ @@ -162,18 +533,100 @@ def main(): sub_extract.add_argument('input_cdx', help="full HDFS path of CDX file to extract") + sub_extractungrobided = subparsers.add_parser('extract-ungrobided') + sub_extractungrobided.set_defaults(func=run_extract_ungrobided) + sub_extractungrobided.add_argument('input_ungrobided', + help="full HDFS path of 'ungrobided' file to extract") + sub_rowcount = subparsers.add_parser('row-count') sub_rowcount.set_defaults(func=run_rowcount) sub_statuscount = subparsers.add_parser('status-count') sub_statuscount.set_defaults(func=run_statuscount) + sub_statuscodecount = subparsers.add_parser('status-code-count') + sub_statuscodecount.set_defaults(func=run_statuscodecount) + + sub_matchcrossref = subparsers.add_parser('match-crossref') + sub_matchcrossref.set_defaults(func=run_matchcrossref) + sub_matchcrossref.add_argument('crossref_input', + help="full HDFS path of Crossref JSON dump") + sub_matchcrossref.add_argument('--reducers', + help="number of reducers to run", + type=int, default=200) + sub_matchcrossref.add_argument('--fatcat-insertable', + help="whether to include CDX and other metadata in output", + action='store_true') + + sub_groupworks = subparsers.add_parser('groupworks-fatcat') + sub_groupworks.set_defaults(func=run_groupworks) + sub_groupworks.add_argument('fatcat_release_input', + help="full HDFS path of fatcat release JSON dump") + sub_groupworks.add_argument('--reducers', + help="number of reducers to run", + type=int, default=400) + + sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat') + sub_groupworkssubset.set_defaults(func=run_groupworkssubset) + sub_groupworkssubset.add_argument('fatcat_release_input_left', + help="full HDFS path of fatcat release JSON dump (LHS of join)") + sub_groupworkssubset.add_argument('fatcat_release_input_right', + help="full HDFS path of fatcat release JSON dump (RHS of join)") + sub_groupworkssubset.add_argument('--reducers', + help="number of reducers to run", + type=int, default=200) + + sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') + sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump) + + sub_dumpfilemeta = subparsers.add_parser('dump-file-meta') + sub_dumpfilemeta.set_defaults(func=run_dumpfilemeta) + + sub_dumpgrobidstatuscode = subparsers.add_parser('dump-grobid-status-code') + sub_dumpgrobidstatuscode.set_defaults(func=run_dumpgrobidstatuscode) + + sub_dumpgrobidmetainsertable = subparsers.add_parser('dump-grobid-meta-insertable') + sub_dumpgrobidmetainsertable.set_defaults(func=run_dumpgrobidmetainsertable) + + sub_dumpgrobidxml = subparsers.add_parser('dump-grobid-xml') + sub_dumpgrobidxml.set_defaults(func=run_dumpgrobidxml) + + sub_colcount = subparsers.add_parser('col-count') + sub_colcount.set_defaults(func=run_colcount) + sub_colcount.add_argument('column', + help="column name to use in count") + + sub_matchbenchmark = subparsers.add_parser('match-benchmark') + sub_matchbenchmark.set_defaults(func=run_matchbenchmark) + sub_matchbenchmark.add_argument('left_bibjson', + help="First bibjson file") + sub_matchbenchmark.add_argument('right_bibjson', + help="Second bibjson file") + sub_matchbenchmark.add_argument('output', + help="where to write output") + + sub_groupworksbenchmark = subparsers.add_parser('groupworks-benchmark') + sub_groupworksbenchmark.set_defaults(func=run_groupworksbenchmark) + sub_groupworksbenchmark.add_argument('fatcat_releases', + help="fatcat releases json file") + sub_groupworksbenchmark.add_argument('output', + help="where to write output") + + sub_keysmissingcol = subparsers.add_parser('keys-missing-col') + sub_keysmissingcol.set_defaults(func=run_keysmissingcol) + sub_keysmissingcol.add_argument('column', + help="column to SCAN for missing keys") + + sub_dumpungrobided = subparsers.add_parser('dump-ungrobided') + sub_dumpungrobided.set_defaults(func=run_dumpungrobided) + args = parser.parse_args() if not args.__dict__.get("func"): - print("tell me what to do! (try --help)") + parser.print_help(file=sys.stderr) sys.exit(-1) if not (args.prod or args.qa) or (args.prod and args.qa): print("must pass one of --prod or --qa") + sys.exit(-1) if args.prod: args.env = "prod" if args.qa: |