#!/usr/bin/env python3 """ Helper script for running Sandcrawler (journal pipeline) tasks in production. This is basically a Makefile. If you edit this file, be sure to only use python3 standard library modules, so there are no dependencies. """ import sys import argparse import subprocess from datetime import datetime HDFS_DIR = "hdfs:///user/bnewbold/sandcrawler" HBASE_HOST = "wbgrp-svc263.us.archive.org" ZOOKEEPER_HOSTS = "mtrcs-zk1.us.archive.org:2181" GROBID_URI = "http://wbgrp-svc096.us.archive.org:8070" # Staging config #HBASE_HOST = "wbgrp-svc312.us.archive.org" #ZOOKEEPER_HOSTS = "wbgrp-svc312.us.archive.org:2181" def rebuild_python(): print("Rebuilding python venv...") cmd = """cd python_hadoop; export PIPENV_VENV_IN_PROJECT=1; pipenv install --deploy tar -czf venv-current.tar.gz -C .venv .""" subprocess.call(cmd, shell=True) def rebuild_scalding(): print("Rebuilding scalding jar...") cmd = """cd scalding; sbt assembly""" subprocess.call(cmd, shell=True) def run_backfill(args): if args.rebuild: rebuild_python() print("Starting backfill job...") output = "{}/output-{}/{}-backfill".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """cd python_hadoop; pipenv run ./backfill_hbase_from_cdx.py \ --hbase-host {hbase_host} \ --hbase-table wbgrp-journal-extract-0-{env} \ -r hadoop \ -c mrjob.conf \ --archive venv-current.tar.gz#venv \ {input_cdx} """.format(hbase_host=HBASE_HOST, env=args.env, input_cdx=args.input_cdx) subprocess.call(cmd, shell=True) def run_extract(args): if args.rebuild: rebuild_python() print("Starting extract job...") output = "{}/output-{}/{}-extract".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """cd python_hadoop; pipenv run ./extraction_cdx_grobid.py \ --hbase-host {hbase_host} \ --hbase-table wbgrp-journal-extract-0-{env} \ --grobid-uri {grobid_uri} \ -r hadoop \ -c mrjob.conf \ --output-dir {output} \ --no-output \ --archive venv-current.tar.gz#venv \ --jobconf mapred.line.input.format.linespermap=8000 \ --jobconf mapreduce.job.queuename=extraction \ --jobconf mapred.task.timeout=3600000 \ {input_cdx} """.format(hbase_host=HBASE_HOST, env=args.env, output=output, input_cdx=args.input_cdx, grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) def run_extract_ungrobided(args): if args.rebuild: rebuild_python() print("Starting extractungrobided job...") output = "{}/output-{}/{}-extract-ungrobided".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """cd python_hadoop; pipenv run ./extraction_ungrobided.py \ --hbase-host {hbase_host} \ --hbase-table wbgrp-journal-extract-0-{env} \ --grobid-uri {grobid_uri} \ -r hadoop \ -c mrjob.conf \ --output-dir {output} \ --no-output \ --archive venv-current.tar.gz#venv \ --jobconf mapred.line.input.format.linespermap=8000 \ --jobconf mapreduce.job.queuename=extraction \ --jobconf mapred.task.timeout=3600000 \ {input_ungrobided} """.format(hbase_host=HBASE_HOST, env=args.env, input_ungrobided=args.input_ungrobided, output=output, grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) def run_rowcount(args): if args.rebuild: rebuild_scalding() print("Starting rowcount job...") output = "{}/output-{}/{}-rowcount".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_statuscodecount(args): if args.rebuild: rebuild_scalding() print("Starting statuscodecount job...") output = "{}/output-{}/{}-statuscodecount".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.HBaseStatusCodeCountJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_statuscount(args): if args.rebuild: rebuild_scalding() print("Starting statuscount job...") output = "{}/output-{}/{}-statuscount".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ -Dmapred.task.timeout=3600000 \ sandcrawler.HBaseStatusCountJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_matchcrossref(args): if args.rebuild: rebuild_scalding() print("Starting matchcrossref job...") output = "{}/output-{}/{}-matchcrossref".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) if args.fatcat_insertable: jobclass = "ScoreInsertableJob" else: jobclass = "ScoreJob" # Notes: -D options must come after Tool but before class name # https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-pass-parameters-to-my-hadoop-job-number-of-reducers--memory-options--etc- # Compression: changed due to errors in production # https://stackoverflow.com/a/11336820/4682349 cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ -Dmapred.reduce.tasks={reducers} \ -Dcascading.spill.list.threshold=500000 \ -Dmapred.output.compress=false \ -Dmapred.compress.map.output=true \ -Dmapred.task.timeout=3600000 \ sandcrawler.{jobclass} \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --crossref-input {crossref_input} \ --output {output}""".format( output=output, jobclass=jobclass, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env, reducers=args.reducers, crossref_input=args.crossref_input) subprocess.call(cmd, shell=True) def run_groupworks(args): if args.rebuild: rebuild_scalding() print("Starting groupworks job...") output = "{}/output-{}/{}-groupworks".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) jobclass = "GroupFatcatWorksJob" cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ -Dmapred.reduce.tasks={reducers} \ -Dcascading.spill.list.threshold=500000 \ -Dmapred.output.compress=false \ -Dmapred.compress.map.output=true \ -Dmapred.task.timeout=3600000 \ sandcrawler.{jobclass} \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --fatcat-release-input {fatcat_release_input} \ --output {output}""".format( output=output, jobclass=jobclass, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env, reducers=args.reducers, fatcat_release_input=args.fatcat_release_input) subprocess.call(cmd, shell=True) def run_groupworkssubset(args): if args.rebuild: rebuild_scalding() print("Starting groupworkssubset job...") output = "{}/output-{}/{}-groupworkssubset".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) jobclass = "GroupFatcatWorksSubsetJob" cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ -Dmapred.reduce.tasks={reducers} \ -Dcascading.spill.list.threshold=500000 \ -Dmapred.output.compress=false \ -Dmapred.compress.map.output=true \ -Dmapred.task.timeout=3600000 \ sandcrawler.{jobclass} \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --fatcat-release-input {fatcat_release_input_left} \ --fatcat-release-input-right {fatcat_release_input_right} \ --output {output}""".format( output=output, jobclass=jobclass, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env, reducers=args.reducers, fatcat_release_input_left=args.fatcat_release_input_left, fatcat_release_input_right=args.fatcat_release_input_right) subprocess.call(cmd, shell=True) def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() print("Starting grobid-scorable-dump job...") output = "{}/output-{}/{}-grobidscorabledump".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.GrobidScorableDumpJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_dumpfilemeta(args): if args.rebuild: rebuild_scalding() print("Starting dumpfilemeta job...") output = "{}/output-{}/{}-dumpfilemeta".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.DumpFileMetaJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_dumpgrobidstatuscode(args): if args.rebuild: rebuild_scalding() print("Starting dumpgrobidstatuscode job...") output = "{}/output-{}/{}-dumpgrobidstatuscode".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.DumpGrobidStatusCodeJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_dumpgrobidmetainsertable(args): if args.rebuild: rebuild_scalding() print("Starting dumpgrobidmetainsertable job...") output = "{}/output-{}/{}-dumpgrobidmetainsertable".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.DumpGrobidMetaInsertableJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_dumpgrobidxml(args): if args.rebuild: rebuild_scalding() print("Starting dumpgrobidxml job...") output = "{}/output-{}/{}-dumpgrobidxml".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.DumpGrobidXmlJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_colcount(args): if args.rebuild: rebuild_scalding() print("Starting colcount job...") output = "{}/output-{}/{}-colcount-{}".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"), args.column.replace(':', '_')) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.HBaseColCountJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --column {column} \ --output {output}""".format( column=args.column, output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def run_matchbenchmark(args): if args.rebuild: rebuild_scalding() print("Starting matchbenchmark job...") cmd = """./pig/deps/hadoop/bin/hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ sandcrawler.MatchBenchmarkJob \ --local \ --app.conf.path scalding/ia_cluster.conf \ --left-bibjson {left_bibjson} \ --right-bibjson {right_bibjson} \ --output {output}""".format( output=args.output, left_bibjson=args.left_bibjson, right_bibjson=args.right_bibjson) subprocess.call(cmd, shell=True) def run_groupworksbenchmark(args): if args.rebuild: rebuild_scalding() print("Starting groupworksbenchmark job...") cmd = """./pig/deps/hadoop/bin/hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ sandcrawler.GroupFatcatWorksJob \ --local \ --app.conf.path scalding/ia_cluster.conf \ --fatcat-release-input {fatcat_releases} \ --output {output}""".format( output=args.output, fatcat_releases=args.fatcat_releases) subprocess.call(cmd, shell=True) def run_keysmissingcol(args): if args.rebuild: rebuild_scalding() print("Starting keysmissingcol job...") output = "{}/output-{}/{}-keysmissingcol-{}".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"), args.column.replace(":", "_")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.MissingColumnDumpJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --column {column} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, column=args.column, env=args.env) subprocess.call(cmd, shell=True) def run_dumpungrobided(args): if args.rebuild: rebuild_scalding() print("Starting dumpungrobided job...") output = "{}/output-{}/{}-dumpungrobided".format( HDFS_DIR, args.env, datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool sandcrawler.DumpUnGrobidedJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ --zookeeper-hosts {zookeeper_hosts} \ --output {output}""".format( output=output, zookeeper_hosts=ZOOKEEPER_HOSTS, env=args.env) subprocess.call(cmd, shell=True) def main(): parser = argparse.ArgumentParser() parser.add_argument('--prod', help="run against prod HBase table", action='store_true') parser.add_argument('--qa', help="run against qa HBase table", action='store_true') parser.add_argument('--rebuild', help="rebuild whatever artifact gets sent", action='store_true') subparsers = parser.add_subparsers() sub_backfill = subparsers.add_parser('backfill') sub_backfill.set_defaults(func=run_backfill) sub_backfill.add_argument('input_cdx', help="full HDFS path of CDX file to backfill") sub_extract = subparsers.add_parser('extract') sub_extract.set_defaults(func=run_extract) sub_extract.add_argument('input_cdx', help="full HDFS path of CDX file to extract") sub_extractungrobided = subparsers.add_parser('extract-ungrobided') sub_extractungrobided.set_defaults(func=run_extract_ungrobided) sub_extractungrobided.add_argument('input_ungrobided', help="full HDFS path of 'ungrobided' file to extract") sub_rowcount = subparsers.add_parser('row-count') sub_rowcount.set_defaults(func=run_rowcount) sub_statuscount = subparsers.add_parser('status-count') sub_statuscount.set_defaults(func=run_statuscount) sub_statuscodecount = subparsers.add_parser('status-code-count') sub_statuscodecount.set_defaults(func=run_statuscodecount) sub_matchcrossref = subparsers.add_parser('match-crossref') sub_matchcrossref.set_defaults(func=run_matchcrossref) sub_matchcrossref.add_argument('crossref_input', help="full HDFS path of Crossref JSON dump") sub_matchcrossref.add_argument('--reducers', help="number of reducers to run", type=int, default=200) sub_matchcrossref.add_argument('--fatcat-insertable', help="whether to include CDX and other metadata in output", action='store_true') sub_groupworks = subparsers.add_parser('groupworks-fatcat') sub_groupworks.set_defaults(func=run_groupworks) sub_groupworks.add_argument('fatcat_release_input', help="full HDFS path of fatcat release JSON dump") sub_groupworks.add_argument('--reducers', help="number of reducers to run", type=int, default=400) sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat') sub_groupworkssubset.set_defaults(func=run_groupworkssubset) sub_groupworkssubset.add_argument('fatcat_release_input_left', help="full HDFS path of fatcat release JSON dump (LHS of join)") sub_groupworkssubset.add_argument('fatcat_release_input_right', help="full HDFS path of fatcat release JSON dump (RHS of join)") sub_groupworkssubset.add_argument('--reducers', help="number of reducers to run", type=int, default=200) sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump) sub_dumpfilemeta = subparsers.add_parser('dump-file-meta') sub_dumpfilemeta.set_defaults(func=run_dumpfilemeta) sub_dumpgrobidstatuscode = subparsers.add_parser('dump-grobid-status-code') sub_dumpgrobidstatuscode.set_defaults(func=run_dumpgrobidstatuscode) sub_dumpgrobidmetainsertable = subparsers.add_parser('dump-grobid-meta-insertable') sub_dumpgrobidmetainsertable.set_defaults(func=run_dumpgrobidmetainsertable) sub_dumpgrobidxml = subparsers.add_parser('dump-grobid-xml') sub_dumpgrobidxml.set_defaults(func=run_dumpgrobidxml) sub_colcount = subparsers.add_parser('col-count') sub_colcount.set_defaults(func=run_colcount) sub_colcount.add_argument('column', help="column name to use in count") sub_matchbenchmark = subparsers.add_parser('match-benchmark') sub_matchbenchmark.set_defaults(func=run_matchbenchmark) sub_matchbenchmark.add_argument('left_bibjson', help="First bibjson file") sub_matchbenchmark.add_argument('right_bibjson', help="Second bibjson file") sub_matchbenchmark.add_argument('output', help="where to write output") sub_groupworksbenchmark = subparsers.add_parser('groupworks-benchmark') sub_groupworksbenchmark.set_defaults(func=run_groupworksbenchmark) sub_groupworksbenchmark.add_argument('fatcat_releases', help="fatcat releases json file") sub_groupworksbenchmark.add_argument('output', help="where to write output") sub_keysmissingcol = subparsers.add_parser('keys-missing-col') sub_keysmissingcol.set_defaults(func=run_keysmissingcol) sub_keysmissingcol.add_argument('column', help="column to SCAN for missing keys") sub_dumpungrobided = subparsers.add_parser('dump-ungrobided') sub_dumpungrobided.set_defaults(func=run_dumpungrobided) args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do! (try --help)") sys.exit(-1) if not (args.prod or args.qa) or (args.prod and args.qa): print("must pass one of --prod or --qa") sys.exit(-1) if args.prod: args.env = "prod" if args.qa: args.env = "qa" args.func(args) if __name__ == '__main__': main()