diff options
Diffstat (limited to 'scalding/src/main')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreInsertable.scala | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala new file mode 100644 index 0000000..58007fa --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala @@ -0,0 +1,86 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class ScoreInsertableJob(args: Args) extends JobBase(args) { + + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val cdxRowCount = Stat("cdx-rows", "sandcrawler") + val scoredRowCount = Stat("scored-rows", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts")) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) + .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } + .map { case (key, cdx, mime, size) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes())) + } + .map { r => + cdxRowCount.inc + r + } + + val scoredPipe = grobidPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(crossrefPipe) + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + scoredRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry + // Not ever Empty, I promise + val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull + (key, new ReduceOutput( + slug, + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json)) + } + .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) } + .groupBy { case (key, _, _, _, _) => key } + + // TypedTsv doesn't work over case classes. + val joinedPipe = scoredPipe + .join(cdxPipe.groupBy { case (key, _, _, _) => key }) + .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) } + .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output"))) +} + +object ScoreInsertableJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("file:cdx", "file:mime", "file:size"), + SourceMode.SCAN_ALL) + } +} |