From 344531eb6a5cdd4ea15e4d82050368c5af0eafee Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 24 Aug 2018 11:16:35 -0700 Subject: add counters to ScoreJob --- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 60 +++++++++++++++++----- 1 file changed, 47 insertions(+), 13 deletions(-) (limited to 'scalding') diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 02714ab..107f504 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -2,29 +2,63 @@ package sandcrawler import cascading.pipe.Pipe import com.twitter.scalding.Args +import com.twitter.scalding.Stat import com.twitter.scalding.TypedPipe import com.twitter.scalding.TypedTsv import parallelai.spyglass.base.JobBase class ScoreJob(args: Args) extends JobBase(args) { - // TODO: Instantiate any subclass of Scorable specified in args. - val sc1 : Scorable = new GrobidScorable() - val sc2 : Scorable = new CrossrefScorable() - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) - pipe1 + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + /* TODO: + val uniqueDoiCount = Stat("unique-doi-count", "sandcrawler") + val uniqueSha1Count = Stat("unique-sha1-count", "sandcrawler") + */ + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + + val joinedPipe = grobidPipe .addTrap(TypedTsv(args("output") + ".trapped")) - .join(pipe2) - .map { entry => - val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + .join(crossrefPipe) + + /* TODO: + // Reduces to count unique SHA1 and DOI + joinedPipe + .map { case (_, (grobidFeatures, _)) => grobidFeatures.sha } + .distinct + .map { _ => uniqueSha1Count.inc } + joinedPipe + .map { case (_, (_, crossrefFeatures)) => crossrefFeatures.doi } + .distinct + .map { _ => uniqueDoiCount.inc } + */ + + // TypedTsv doesn't work over case classes. + joinedPipe + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + joinedRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry new ReduceOutput( slug, - Scorable.computeSimilarity(features1, features2), - features1.json, - features2.json) + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json) } - // TypedTsv doesn't work over case classes. .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } .write(TypedTsv[(String, Int, String, String)](args("output"))) } -- cgit v1.2.3