From d2d545675b3c85c3e1b41fd4bda23230d995bf47 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 26 Aug 2019 23:25:28 +0200 Subject: GroupFatcatWorksSubsetJob This is a hack-y variant of GroupFatcatWorksSubsetJob which allows setting different left and right sides of the join. The initial application is to re-run work merging with only longtail-oa works on the "left", with the goal of hard-merging these releases into existing releases with actual identifiers (instead of just grouping into works). As a refactor, the normal GroupFatcatWorksJob could just be this with the same file passed as both left and right, though that requires twice as much JSON parsing/filtering. --- please | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) (limited to 'please') diff --git a/please b/please index cc2a6ad..0bfc0b2 100755 --- a/please +++ b/please @@ -245,6 +245,40 @@ def run_groupworks(args): fatcat_release_input=args.fatcat_release_input) subprocess.call(cmd, shell=True) +def run_groupworkssubset(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworkssubset job...") + output = "{}/output-{}/{}-groupworkssubset".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksSubsetJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input_left} \ + --fatcat-release-input-right {fatcat_release_input_right} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input_left=args.fatcat_release_input_left, + fatcat_release_input_right=args.fatcat_release_input_right) + subprocess.call(cmd, shell=True) + def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() @@ -507,6 +541,16 @@ def main(): sub_groupworks.add_argument('fatcat_release_input', help="full HDFS path of fatcat release JSON dump") sub_groupworks.add_argument('--reducers', + help="number of reducers to run", + type=int, default=400) + + sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat') + sub_groupworkssubset.set_defaults(func=run_groupworkssubset) + sub_groupworkssubset.add_argument('fatcat_release_input_left', + help="full HDFS path of fatcat release JSON dump (LHS of join)") + sub_groupworkssubset.add_argument('fatcat_release_input_right', + help="full HDFS path of fatcat release JSON dump (RHS of join)") + sub_groupworkssubset.add_argument('--reducers', help="number of reducers to run", type=int, default=200) -- cgit v1.2.3