diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2018-09-12 15:27:08 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2018-09-12 15:27:11 -0700 | 
| commit | e5db9bc783c1157984c160d155c44d38b84b57ce (patch) | |
| tree | 2bae1f8a17e9a667ec5df8d468decb2b5b10915e /scalding/src/main | |
| parent | 9d21a922386624b39b730849ab56c00a061f255f (diff) | |
| download | sandcrawler-e5db9bc783c1157984c160d155c44d38b84b57ce.tar.gz sandcrawler-e5db9bc783c1157984c160d155c44d38b84b57ce.zip | |
hack scorejob variant with extra context joined in
This is to allow pushes to fatcat. There is probably a better way...
Diffstat (limited to 'scalding/src/main')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreInsertable.scala | 86 | 
1 files changed, 86 insertions, 0 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala new file mode 100644 index 0000000..58007fa --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala @@ -0,0 +1,86 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class ScoreInsertableJob(args: Args) extends JobBase(args) { + +  val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") +  val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") +  val cdxRowCount = Stat("cdx-rows", "sandcrawler") +  val scoredRowCount = Stat("scored-rows", "sandcrawler") +  val joinedRowCount = Stat("joined-rows", "sandcrawler") + +  val grobidScorable : Scorable = new GrobidScorable() +  val crossrefScorable : Scorable = new CrossrefScorable() + +  val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable +    .getInputPipe(args) +    .map { r => +      grobidRowCount.inc +      r +    } +  val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable +    .getInputPipe(args) +    .map { r => +      crossrefRowCount.inc +      r +    } +  val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts")) +    .read +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) +    .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } +    .map { case (key, cdx, mime, size) => +      (Bytes.toString(key.copyBytes()), +       Bytes.toString(cdx.copyBytes()), +       Bytes.toString(mime.copyBytes()), +       Bytes.toLong(size.copyBytes())) +    } +    .map { r => +      cdxRowCount.inc +      r +    } + +  val scoredPipe = grobidPipe +    .addTrap(TypedTsv(args("output") + ".trapped")) +    .join(crossrefPipe) +    .map { case (slug, (grobidFeatures, crossrefFeatures)) => +      scoredRowCount.inc +      //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry +      // Not ever Empty, I promise +      val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull +      (key, new ReduceOutput( +        slug, +        Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), +        grobidFeatures.json, +        crossrefFeatures.json)) +    } +    .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) } +    .groupBy { case (key, _, _, _, _) => key } + +  // TypedTsv doesn't work over case classes. +  val joinedPipe = scoredPipe +    .join(cdxPipe.groupBy { case (key, _, _, _) => key }) +    .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) } +    .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output"))) +} + +object ScoreInsertableJob { + +  // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" +  def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { +    HBaseBuilder.build( +      hbaseTable, +      zookeeperHosts, +      List("file:cdx", "file:mime", "file:size"), +      SourceMode.SCAN_ALL) +  } +} | 
