From 952457f9fae1cc25cdeeefc00e19ae20cf86c659 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sat, 10 Aug 2019 19:50:18 -0700 Subject: please command for groupworksfatcat --- please | 63 ++++++++++++++++++++++ .../scala/sandcrawler/GroupFatcatWorksJob.scala | 43 +++++++++++++++ .../main/scala/sandcrawler/ScoreSelfFatcat.scala | 43 --------------- 3 files changed, 106 insertions(+), 43 deletions(-) create mode 100644 scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala delete mode 100644 scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala diff --git a/please b/please index bf621bf..cc2a6ad 100755 --- a/please +++ b/please @@ -213,6 +213,38 @@ def run_matchcrossref(args): crossref_input=args.crossref_input) subprocess.call(cmd, shell=True) +def run_groupworks(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworks job...") + output = "{}/output-{}/{}-groupworks".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input=args.fatcat_release_input) + subprocess.call(cmd, shell=True) + def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() @@ -360,6 +392,22 @@ def run_matchbenchmark(args): right_bibjson=args.right_bibjson) subprocess.call(cmd, shell=True) +def run_groupworksbenchmark(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworksbenchmark job...") + cmd = """./pig/deps/hadoop/bin/hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + sandcrawler.GroupFatcatWorksJob \ + --local \ + --app.conf.path scalding/ia_cluster.conf \ + --fatcat-release-input {fatcat_releases} \ + --output {output}""".format( + output=args.output, + fatcat_releases=args.fatcat_releases) + subprocess.call(cmd, shell=True) + def run_keysmissingcol(args): if args.rebuild: rebuild_scalding() @@ -454,6 +502,14 @@ def main(): help="whether to include CDX and other metadata in output", action='store_true') + sub_groupworks = subparsers.add_parser('groupworks-fatcat') + sub_groupworks.set_defaults(func=run_groupworks) + sub_groupworks.add_argument('fatcat_release_input', + help="full HDFS path of fatcat release JSON dump") + sub_groupworks.add_argument('--reducers', + help="number of reducers to run", + type=int, default=200) + sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump) @@ -483,6 +539,13 @@ def main(): sub_matchbenchmark.add_argument('output', help="where to write output") + sub_groupworksbenchmark = subparsers.add_parser('groupworks-benchmark') + sub_groupworksbenchmark.set_defaults(func=run_groupworksbenchmark) + sub_groupworksbenchmark.add_argument('fatcat_releases', + help="fatcat releases json file") + sub_groupworksbenchmark.add_argument('output', + help="where to write output") + sub_keysmissingcol = subparsers.add_parser('keys-missing-col') sub_keysmissingcol.set_defaults(func=run_keysmissingcol) sub_keysmissingcol.add_argument('column', diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala new file mode 100644 index 0000000..46d2038 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala @@ -0,0 +1,43 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GroupFatcatWorksJob(args: Args) extends JobBase(args) { + + val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val fatcatScorable : Scorable = new FatcatScorable() + val fatcatPipe : TypedPipe[(String, ReduceFeatures)] = fatcatScorable + .getInputPipe(args) + .map { r => + fatcatRowCount.inc + r + } + + val joinedPipe = fatcatPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(fatcatPipe) + + // TypedTsv doesn't work over case classes. + joinedPipe + // filter out trivial self-matches (releases are identical) + .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) + } + .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + joinedRowCount.inc + new ReduceOutput( + slug, + Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), + fatcatFeaturesLeft.json, + fatcatFeaturesRight.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala b/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala deleted file mode 100644 index d1a94fe..0000000 --- a/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala +++ /dev/null @@ -1,43 +0,0 @@ -package sandcrawler - -import cascading.pipe.Pipe -import com.twitter.scalding.Args -import com.twitter.scalding.Stat -import com.twitter.scalding.TypedPipe -import com.twitter.scalding.TypedTsv -import parallelai.spyglass.base.JobBase - -class ScoreSelfFatcatJob(args: Args) extends JobBase(args) { - - val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler") - val joinedRowCount = Stat("joined-rows", "sandcrawler") - - val fatcatScorable : Scorable = new FatcatScorable() - val fatcatPipe : TypedPipe[(String, ReduceFeatures)] = fatcatScorable - .getInputPipe(args) - .map { r => - fatcatRowCount.inc - r - } - - val joinedPipe = fatcatPipe - .addTrap(TypedTsv(args("output") + ".trapped")) - .join(fatcatPipe) - - // TypedTsv doesn't work over case classes. - joinedPipe - // filter out trivial self-matches (releases are identical) - .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => - Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) - } - .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => - joinedRowCount.inc - new ReduceOutput( - slug, - Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), - fatcatFeaturesLeft.json, - fatcatFeaturesRight.json) - } - .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } - .write(TypedTsv[(String, Int, String, String)](args("output"))) -} -- cgit v1.2.3