aboutsummaryrefslogtreecommitdiffstats
path: root/please
diff options
context:
space:
mode:
Diffstat (limited to 'please')
-rwxr-xr-xplease63
1 files changed, 63 insertions, 0 deletions
diff --git a/please b/please
index bf621bf..cc2a6ad 100755
--- a/please
+++ b/please
@@ -213,6 +213,38 @@ def run_matchcrossref(args):
crossref_input=args.crossref_input)
subprocess.call(cmd, shell=True)
+def run_groupworks(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting groupworks job...")
+ output = "{}/output-{}/{}-groupworks".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ jobclass = "GroupFatcatWorksJob"
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool \
+ -Dmapred.reduce.tasks={reducers} \
+ -Dcascading.spill.list.threshold=500000 \
+ -Dmapred.output.compress=false \
+ -Dmapred.compress.map.output=true \
+ -Dmapred.task.timeout=3600000 \
+ sandcrawler.{jobclass} \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --fatcat-release-input {fatcat_release_input} \
+ --output {output}""".format(
+ output=output,
+ jobclass=jobclass,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env,
+ reducers=args.reducers,
+ fatcat_release_input=args.fatcat_release_input)
+ subprocess.call(cmd, shell=True)
+
def run_grobidscorabledump(args):
if args.rebuild:
rebuild_scalding()
@@ -360,6 +392,22 @@ def run_matchbenchmark(args):
right_bibjson=args.right_bibjson)
subprocess.call(cmd, shell=True)
+def run_groupworksbenchmark(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting groupworksbenchmark job...")
+ cmd = """./pig/deps/hadoop/bin/hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool \
+ sandcrawler.GroupFatcatWorksJob \
+ --local \
+ --app.conf.path scalding/ia_cluster.conf \
+ --fatcat-release-input {fatcat_releases} \
+ --output {output}""".format(
+ output=args.output,
+ fatcat_releases=args.fatcat_releases)
+ subprocess.call(cmd, shell=True)
+
def run_keysmissingcol(args):
if args.rebuild:
rebuild_scalding()
@@ -454,6 +502,14 @@ def main():
help="whether to include CDX and other metadata in output",
action='store_true')
+ sub_groupworks = subparsers.add_parser('groupworks-fatcat')
+ sub_groupworks.set_defaults(func=run_groupworks)
+ sub_groupworks.add_argument('fatcat_release_input',
+ help="full HDFS path of fatcat release JSON dump")
+ sub_groupworks.add_argument('--reducers',
+ help="number of reducers to run",
+ type=int, default=200)
+
sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump')
sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump)
@@ -483,6 +539,13 @@ def main():
sub_matchbenchmark.add_argument('output',
help="where to write output")
+ sub_groupworksbenchmark = subparsers.add_parser('groupworks-benchmark')
+ sub_groupworksbenchmark.set_defaults(func=run_groupworksbenchmark)
+ sub_groupworksbenchmark.add_argument('fatcat_releases',
+ help="fatcat releases json file")
+ sub_groupworksbenchmark.add_argument('output',
+ help="where to write output")
+
sub_keysmissingcol = subparsers.add_parser('keys-missing-col')
sub_keysmissingcol.set_defaults(func=run_keysmissingcol)
sub_keysmissingcol.add_argument('column',