diff options
-rwxr-xr-x | please | 44 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/FatcatScorable.scala | 15 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala | 52 |
3 files changed, 111 insertions, 0 deletions
@@ -245,6 +245,40 @@ def run_groupworks(args): fatcat_release_input=args.fatcat_release_input) subprocess.call(cmd, shell=True) +def run_groupworkssubset(args): + if args.rebuild: + rebuild_scalding() + print("Starting groupworkssubset job...") + output = "{}/output-{}/{}-groupworkssubset".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + jobclass = "GroupFatcatWorksSubsetJob" + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool \ + -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -Dmapred.output.compress=false \ + -Dmapred.compress.map.output=true \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.{jobclass} \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --fatcat-release-input {fatcat_release_input_left} \ + --fatcat-release-input-right {fatcat_release_input_right} \ + --output {output}""".format( + output=output, + jobclass=jobclass, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env, + reducers=args.reducers, + fatcat_release_input_left=args.fatcat_release_input_left, + fatcat_release_input_right=args.fatcat_release_input_right) + subprocess.call(cmd, shell=True) + def run_grobidscorabledump(args): if args.rebuild: rebuild_scalding() @@ -508,6 +542,16 @@ def main(): help="full HDFS path of fatcat release JSON dump") sub_groupworks.add_argument('--reducers', help="number of reducers to run", + type=int, default=400) + + sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat') + sub_groupworkssubset.set_defaults(func=run_groupworkssubset) + sub_groupworkssubset.add_argument('fatcat_release_input_left', + help="full HDFS path of fatcat release JSON dump (LHS of join)") + sub_groupworkssubset.add_argument('fatcat_release_input_right', + help="full HDFS path of fatcat release JSON dump (RHS of join)") + sub_groupworkssubset.add_argument('--reducers', + help="number of reducers to run", type=int, default=200) sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') diff --git a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala index cffc2c0..2090e84 100644 --- a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala +++ b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala @@ -11,6 +11,21 @@ import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBasePipeConversions + +class FatcatScorableRight extends Scorable { + + def getSource(args : Args) : Source = { + TextLine(args("fatcat-release-input-right")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { FatcatScorable.keepRecord(_) } + .map { FatcatScorable.jsonToMapFeatures(_) } + } +} + class FatcatScorable extends Scorable with HBasePipeConversions { def getSource(args : Args) : Source = { diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala new file mode 100644 index 0000000..ea5e26b --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala @@ -0,0 +1,52 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GroupFatcatWorksSubsetJob(args: Args) extends JobBase(args) { + + val fatcatLhsRowCount = Stat("fatcat-rows-filtered-left", "sandcrawler") + val fatcatRhsRowCount = Stat("fatcat-rows-filtered-right", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val fatcatScorableLhs : Scorable = new FatcatScorable() + val fatcatPipeLhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableLhs + .getInputPipe(args) + .map { r => + fatcatLhsRowCount.inc + r + } + + val fatcatScorableRhs : Scorable = new FatcatScorableRight() + val fatcatPipeRhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableRhs + .getInputPipe(args) + .map { r => + fatcatRhsRowCount.inc + r + } + + val joinedPipe = fatcatPipeLhs + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(fatcatPipeRhs) + + // TypedTsv doesn't work over case classes. + joinedPipe + // filter out trivial self-matches (releases are identical) + .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) + } + .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + joinedRowCount.inc + new ReduceOutput( + slug, + Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), + fatcatFeaturesLeft.json, + fatcatFeaturesRight.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} |