aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreInsertable.scala86
1 files changed, 86 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala
new file mode 100644
index 0000000..58007fa
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala
@@ -0,0 +1,86 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.pipe.Pipe
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class ScoreInsertableJob(args: Args) extends JobBase(args) {
+
+ val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler")
+ val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler")
+ val cdxRowCount = Stat("cdx-rows", "sandcrawler")
+ val scoredRowCount = Stat("scored-rows", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+
+ val grobidScorable : Scorable = new GrobidScorable()
+ val crossrefScorable : Scorable = new CrossrefScorable()
+
+ val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable
+ .getInputPipe(args)
+ .map { r =>
+ grobidRowCount.inc
+ r
+ }
+ val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable
+ .getInputPipe(args)
+ .map { r =>
+ crossrefRowCount.inc
+ r
+ }
+ val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size"))
+ .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null }
+ .map { case (key, cdx, mime, size) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toString(cdx.copyBytes()),
+ Bytes.toString(mime.copyBytes()),
+ Bytes.toLong(size.copyBytes()))
+ }
+ .map { r =>
+ cdxRowCount.inc
+ r
+ }
+
+ val scoredPipe = grobidPipe
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(crossrefPipe)
+ .map { case (slug, (grobidFeatures, crossrefFeatures)) =>
+ scoredRowCount.inc
+ //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry
+ // Not ever Empty, I promise
+ val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull
+ (key, new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(grobidFeatures, crossrefFeatures),
+ grobidFeatures.json,
+ crossrefFeatures.json))
+ }
+ .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) }
+ .groupBy { case (key, _, _, _, _) => key }
+
+ // TypedTsv doesn't work over case classes.
+ val joinedPipe = scoredPipe
+ .join(cdxPipe.groupBy { case (key, _, _, _) => key })
+ .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) }
+ .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output")))
+}
+
+object ScoreInsertableJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("file:cdx", "file:mime", "file:size"),
+ SourceMode.SCAN_ALL)
+ }
+}