diff options
Diffstat (limited to 'scalding/src/main')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreJob.scala | 60 | 
1 files changed, 47 insertions, 13 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 02714ab..107f504 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -2,29 +2,63 @@ package sandcrawler  import cascading.pipe.Pipe  import com.twitter.scalding.Args +import com.twitter.scalding.Stat  import com.twitter.scalding.TypedPipe  import com.twitter.scalding.TypedTsv  import parallelai.spyglass.base.JobBase  class ScoreJob(args: Args) extends JobBase(args) { -  // TODO: Instantiate any subclass of Scorable specified in args. -  val sc1 : Scorable = new GrobidScorable() -  val sc2 : Scorable = new CrossrefScorable() -  val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) -  val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) -  pipe1 +  val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") +  val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") +  val joinedRowCount = Stat("joined-rows", "sandcrawler") +  /* TODO: +  val uniqueDoiCount = Stat("unique-doi-count", "sandcrawler") +  val uniqueSha1Count = Stat("unique-sha1-count", "sandcrawler") +  */ + +  val grobidScorable : Scorable = new GrobidScorable() +  val crossrefScorable : Scorable = new CrossrefScorable() +  val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable +    .getInputPipe(args) +    .map { r => +      grobidRowCount.inc +      r +    } +  val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable +    .getInputPipe(args) +    .map { r => +      crossrefRowCount.inc +      r +    } + +  val joinedPipe = grobidPipe      .addTrap(TypedTsv(args("output") + ".trapped")) -    .join(pipe2) -    .map { entry => -      val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry +    .join(crossrefPipe) + +  /* TODO: +  // Reduces to count unique SHA1 and DOI +  joinedPipe +    .map { case (_, (grobidFeatures, _)) => grobidFeatures.sha } +    .distinct +    .map { _ => uniqueSha1Count.inc } +  joinedPipe +    .map { case (_, (_, crossrefFeatures)) => crossrefFeatures.doi } +    .distinct +    .map { _ => uniqueDoiCount.inc } +  */ + +  // TypedTsv doesn't work over case classes. +  joinedPipe +    .map { case (slug, (grobidFeatures, crossrefFeatures)) => +      joinedRowCount.inc +      //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry        new ReduceOutput(          slug, -        Scorable.computeSimilarity(features1, features2), -        features1.json, -        features2.json) +        Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), +        grobidFeatures.json, +        crossrefFeatures.json)      } -    // TypedTsv doesn't work over case classes.      .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }      .write(TypedTsv[(String, Int, String, String)](args("output")))  } | 
