diff options
author | Bryan Newbold <bnewbold@archive.org> | 2019-08-26 23:25:28 +0200 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2019-08-26 23:25:34 +0200 |
commit | d2d545675b3c85c3e1b41fd4bda23230d995bf47 (patch) | |
tree | e46a93f62de8924e00025d2470136f474792d5fe /please | |
parent | 5cf94a887d8edbeb811ea440c6b8eef90e34f7a2 (diff) | |
download | sandcrawler-d2d545675b3c85c3e1b41fd4bda23230d995bf47.tar.gz sandcrawler-d2d545675b3c85c3e1b41fd4bda23230d995bf47.zip |
GroupFatcatWorksSubsetJob
This is a hack-y variant of GroupFatcatWorksSubsetJob which allows
setting different left and right sides of the join. The initial
application is to re-run work merging with only longtail-oa works on the
"left", with the goal of hard-merging these releases into existing
releases with actual identifiers (instead of just grouping into works).
As a refactor, the normal GroupFatcatWorksJob could just be this with
the same file passed as both left and right, though that requires twice
as much JSON parsing/filtering.
Diffstat (limited to 'please')
-rwxr-xr-x | please | 44 |
1 files changed, 44 insertions, 0 deletions
@@ -245,6 +245,40 @@ def run_groupworks(args): fatcat_release_input=args.fatcat_release_input) subprocess.call(cmd, shell=True) +def run_groupworkssubset(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworkssubset job...") + output = "{}/output-{}/{}-groupworkssubset".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksSubsetJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input_left} \ + --fatcat-release-input-right {fatcat_release_input_right} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input_left=args.fatcat_release_input_left, + fatcat_release_input_right=args.fatcat_release_input_right) + subprocess.call(cmd, shell=True) + def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() @@ -508,6 +542,16 @@ def main(): help="full HDFS path of fatcat release JSON dump") sub_groupworks.add_argument('--reducers', help="number of reducers to run", + type=int, default=400) + + sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat') + sub_groupworkssubset.set_defaults(func=run_groupworkssubset) + sub_groupworkssubset.add_argument('fatcat_release_input_left', + help="full HDFS path of fatcat release JSON dump (LHS of join)") + sub_groupworkssubset.add_argument('fatcat_release_input_right', + help="full HDFS path of fatcat release JSON dump (RHS of join)") + sub_groupworkssubset.add_argument('--reducers', + help="number of reducers to run", type=int, default=200) sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') |