aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-08-26 23:25:28 +0200
committerBryan Newbold <bnewbold@archive.org>2019-08-26 23:25:34 +0200
commitd2d545675b3c85c3e1b41fd4bda23230d995bf47 (patch)
treee46a93f62de8924e00025d2470136f474792d5fe
parent5cf94a887d8edbeb811ea440c6b8eef90e34f7a2 (diff)
downloadsandcrawler-d2d545675b3c85c3e1b41fd4bda23230d995bf47.tar.gz
sandcrawler-d2d545675b3c85c3e1b41fd4bda23230d995bf47.zip
GroupFatcatWorksSubsetJob
This is a hack-y variant of GroupFatcatWorksSubsetJob which allows setting different left and right sides of the join. The initial application is to re-run work merging with only longtail-oa works on the "left", with the goal of hard-merging these releases into existing releases with actual identifiers (instead of just grouping into works). As a refactor, the normal GroupFatcatWorksJob could just be this with the same file passed as both left and right, though that requires twice as much JSON parsing/filtering.
-rwxr-xr-xplease44
-rw-r--r--scalding/src/main/scala/sandcrawler/FatcatScorable.scala15
-rw-r--r--scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala52
3 files changed, 111 insertions, 0 deletions
diff --git a/please b/please
index cc2a6ad..0bfc0b2 100755
--- a/please
+++ b/please
@@ -245,6 +245,40 @@ def run_groupworks(args):
fatcat_release_input=args.fatcat_release_input)
subprocess.call(cmd, shell=True)
+def run_groupworkssubset(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting groupworkssubset job...")
+ output = "{}/output-{}/{}-groupworkssubset".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ jobclass = "GroupFatcatWorksSubsetJob"
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool \
+ -Dmapred.reduce.tasks={reducers} \
+ -Dcascading.spill.list.threshold=500000 \
+ -Dmapred.output.compress=false \
+ -Dmapred.compress.map.output=true \
+ -Dmapred.task.timeout=3600000 \
+ sandcrawler.{jobclass} \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --fatcat-release-input {fatcat_release_input_left} \
+ --fatcat-release-input-right {fatcat_release_input_right} \
+ --output {output}""".format(
+ output=output,
+ jobclass=jobclass,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env,
+ reducers=args.reducers,
+ fatcat_release_input_left=args.fatcat_release_input_left,
+ fatcat_release_input_right=args.fatcat_release_input_right)
+ subprocess.call(cmd, shell=True)
+
def run_grobidscorabledump(args):
if args.rebuild:
rebuild_scalding()
@@ -508,6 +542,16 @@ def main():
help="full HDFS path of fatcat release JSON dump")
sub_groupworks.add_argument('--reducers',
help="number of reducers to run",
+ type=int, default=400)
+
+ sub_groupworkssubset = subparsers.add_parser('groupworkssubset-fatcat')
+ sub_groupworkssubset.set_defaults(func=run_groupworkssubset)
+ sub_groupworkssubset.add_argument('fatcat_release_input_left',
+ help="full HDFS path of fatcat release JSON dump (LHS of join)")
+ sub_groupworkssubset.add_argument('fatcat_release_input_right',
+ help="full HDFS path of fatcat release JSON dump (RHS of join)")
+ sub_groupworkssubset.add_argument('--reducers',
+ help="number of reducers to run",
type=int, default=200)
sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump')
diff --git a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala
index cffc2c0..2090e84 100644
--- a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala
@@ -11,6 +11,21 @@ import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.hbase.HBasePipeConversions
+
+class FatcatScorableRight extends Scorable {
+
+ def getSource(args : Args) : Source = {
+ TextLine(args("fatcat-release-input-right"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .filter { FatcatScorable.keepRecord(_) }
+ .map { FatcatScorable.jsonToMapFeatures(_) }
+ }
+}
+
class FatcatScorable extends Scorable with HBasePipeConversions {
def getSource(args : Args) : Source = {
diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala
new file mode 100644
index 0000000..ea5e26b
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala
@@ -0,0 +1,52 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class GroupFatcatWorksSubsetJob(args: Args) extends JobBase(args) {
+
+ val fatcatLhsRowCount = Stat("fatcat-rows-filtered-left", "sandcrawler")
+ val fatcatRhsRowCount = Stat("fatcat-rows-filtered-right", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+
+ val fatcatScorableLhs : Scorable = new FatcatScorable()
+ val fatcatPipeLhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableLhs
+ .getInputPipe(args)
+ .map { r =>
+ fatcatLhsRowCount.inc
+ r
+ }
+
+ val fatcatScorableRhs : Scorable = new FatcatScorableRight()
+ val fatcatPipeRhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableRhs
+ .getInputPipe(args)
+ .map { r =>
+ fatcatRhsRowCount.inc
+ r
+ }
+
+ val joinedPipe = fatcatPipeLhs
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(fatcatPipeRhs)
+
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ // filter out trivial self-matches (releases are identical)
+ .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) =>
+ Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight)
+ }
+ .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) =>
+ joinedRowCount.inc
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight),
+ fatcatFeaturesLeft.json,
+ fatcatFeaturesRight.json)
+ }
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}