aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala60
1 files changed, 47 insertions, 13 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 02714ab..107f504 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -2,29 +2,63 @@ package sandcrawler
import cascading.pipe.Pipe
import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
import com.twitter.scalding.TypedPipe
import com.twitter.scalding.TypedTsv
import parallelai.spyglass.base.JobBase
class ScoreJob(args: Args) extends JobBase(args) {
- // TODO: Instantiate any subclass of Scorable specified in args.
- val sc1 : Scorable = new GrobidScorable()
- val sc2 : Scorable = new CrossrefScorable()
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
- pipe1
+ val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler")
+ val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+ /* TODO:
+ val uniqueDoiCount = Stat("unique-doi-count", "sandcrawler")
+ val uniqueSha1Count = Stat("unique-sha1-count", "sandcrawler")
+ */
+
+ val grobidScorable : Scorable = new GrobidScorable()
+ val crossrefScorable : Scorable = new CrossrefScorable()
+ val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable
+ .getInputPipe(args)
+ .map { r =>
+ grobidRowCount.inc
+ r
+ }
+ val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable
+ .getInputPipe(args)
+ .map { r =>
+ crossrefRowCount.inc
+ r
+ }
+
+ val joinedPipe = grobidPipe
.addTrap(TypedTsv(args("output") + ".trapped"))
- .join(pipe2)
- .map { entry =>
- val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ .join(crossrefPipe)
+
+ /* TODO:
+ // Reduces to count unique SHA1 and DOI
+ joinedPipe
+ .map { case (_, (grobidFeatures, _)) => grobidFeatures.sha }
+ .distinct
+ .map { _ => uniqueSha1Count.inc }
+ joinedPipe
+ .map { case (_, (_, crossrefFeatures)) => crossrefFeatures.doi }
+ .distinct
+ .map { _ => uniqueDoiCount.inc }
+ */
+
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ .map { case (slug, (grobidFeatures, crossrefFeatures)) =>
+ joinedRowCount.inc
+ //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry
new ReduceOutput(
slug,
- Scorable.computeSimilarity(features1, features2),
- features1.json,
- features2.json)
+ Scorable.computeSimilarity(grobidFeatures, crossrefFeatures),
+ grobidFeatures.json,
+ crossrefFeatures.json)
}
- // TypedTsv doesn't work over case classes.
.map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
.write(TypedTsv[(String, Int, String, String)](args("output")))
}