diff options
Diffstat (limited to 'please')
-rwxr-xr-x | please | 44 |
1 files changed, 44 insertions, 0 deletions
@@ -245,6 +245,40 @@ def run_groupworks(args): fatcat_release_input=args.fatcat_release_input) subprocess.call(cmd, shell=True) +def run_groupworkssubset(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworkssubset job...") + output = "{}/output-{}/{}-groupworkssubset".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksSubsetJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input_left} \ + --fatcat-release-input-right {fatcat_release_input_right} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input_left=args.fatcat_release_input_left, + fatcat_release_input_right=args.fatcat_release_input_right) + subprocess.call(cmd, shell=True) + def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() @@ -508,6 +542,16 @@ def main(): help="full HDFS path of fatcat release JSON dump") sub_groupworks.add_argument('--reducers', help="number of reducers to run", + type=int, default=400) + + sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat') + sub_groupworkssubset.set_defaults(func=run_groupworkssubset) + sub_groupworkssubset.add_argument('fatcat_release_input_left', + help="full HDFS path of fatcat release JSON dump (LHS of join)") + sub_groupworkssubset.add_argument('fatcat_release_input_right', + help="full HDFS path of fatcat release JSON dump (RHS of join)") + sub_groupworkssubset.add_argument('--reducers', + help="number of reducers to run", type=int, default=200) sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') |