diff options
-rwxr-xr-x | please | 63 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala (renamed from scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala) | 2 |
2 files changed, 64 insertions, 1 deletions
@@ -213,6 +213,38 @@ def run_matchcrossref(args): crossref_input=args.crossref_input) subprocess.call(cmd, shell=True) +def run_groupworks(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworks job...") + output = "{}/output-{}/{}-groupworks".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input=args.fatcat_release_input) + subprocess.call(cmd, shell=True) + def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() @@ -360,6 +392,22 @@ def run_matchbenchmark(args): right_bibjson=args.right_bibjson) subprocess.call(cmd, shell=True) +def run_groupworksbenchmark(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworksbenchmark job...") + cmd = """./pig/deps/hadoop/bin/hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + sandcrawler.GroupFatcatWorksJob \ + --local \ + --app.conf.path scalding/ia_cluster.conf \ + --fatcat-release-input {fatcat_releases} \ + --output {output}""".format( + output=args.output, + fatcat_releases=args.fatcat_releases) + subprocess.call(cmd, shell=True) + def run_keysmissingcol(args): if args.rebuild: rebuild_scalding() @@ -454,6 +502,14 @@ def main(): help="whether to include CDX and other metadata in output", action='store_true') + sub_groupworks = subparsers.add_parser('groupworks-fatcat') + sub_groupworks.set_defaults(func=run_groupworks) + sub_groupworks.add_argument('fatcat_release_input', + help="full HDFS path of fatcat release JSON dump") + sub_groupworks.add_argument('--reducers', + help="number of reducers to run", + type=int, default=200) + sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump) @@ -483,6 +539,13 @@ def main(): sub_matchbenchmark.add_argument('output', help="where to write output") + sub_groupworksbenchmark = subparsers.add_parser('groupworks-benchmark') + sub_groupworksbenchmark.set_defaults(func=run_groupworksbenchmark) + sub_groupworksbenchmark.add_argument('fatcat_releases', + help="fatcat releases json file") + sub_groupworksbenchmark.add_argument('output', + help="where to write output") + sub_keysmissingcol = subparsers.add_parser('keys-missing-col') sub_keysmissingcol.set_defaults(func=run_keysmissingcol) sub_keysmissingcol.add_argument('column', diff --git a/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala index d1a94fe..46d2038 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala @@ -7,7 +7,7 @@ import com.twitter.scalding.TypedPipe import com.twitter.scalding.TypedTsv import parallelai.spyglass.base.JobBase -class ScoreSelfFatcatJob(args: Args) extends JobBase(args) { +class GroupFatcatWorksJob(args: Args) extends JobBase(args) { val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler") val joinedRowCount = Stat("joined-rows", "sandcrawler") |