aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/ScoreJob.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala92
1 files changed, 48 insertions, 44 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 28e9132..107f504 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -2,59 +2,63 @@ package sandcrawler
import cascading.pipe.Pipe
import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
import com.twitter.scalding.TypedPipe
import com.twitter.scalding.TypedTsv
import parallelai.spyglass.base.JobBase
class ScoreJob(args: Args) extends JobBase(args) {
- // TODO: Instantiate any subclass of Scorable specified in args.
- val sc1 : Scorable = new GrobidScorable()
- val sc2 : Scorable = new CrossrefScorable()
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
- pipe1
- .addTrap(TypedTsv(args("output") + ".trapped"))
- .join(pipe2)
- .map { entry =>
- val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(
- slug,
- Scorable.computeSimilarity(features1, features2),
- features1.json,
- features2.json)
- }
- //TypedTsv doesn't work over case classes.
- .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
- .write(TypedTsv[(String, Int, String, String)](args("output")))
-}
-
-/*
-// Ugly hack to get non-String information into ScoreJob above.
-object ScoreJob {
- var scorable1 : Option[Scorable] = None
- var scorable2 : Option[Scorable] = None
+ val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler")
+ val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+ /* TODO:
+ val uniqueDoiCount = Stat("unique-doi-count", "sandcrawler")
+ val uniqueSha1Count = Stat("unique-sha1-count", "sandcrawler")
+ */
- def setScorable1(s : Scorable) {
- scorable1 = Some(s)
- }
-
- def getScorable1() : Scorable = {
- scorable1 match {
- case Some(s) => s
- case None => null
+ val grobidScorable : Scorable = new GrobidScorable()
+ val crossrefScorable : Scorable = new CrossrefScorable()
+ val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable
+ .getInputPipe(args)
+ .map { r =>
+ grobidRowCount.inc
+ r
+ }
+ val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable
+ .getInputPipe(args)
+ .map { r =>
+ crossrefRowCount.inc
+ r
}
- }
- def setScorable2(s: Scorable) {
- scorable2 = Some(s)
- }
+ val joinedPipe = grobidPipe
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(crossrefPipe)
+
+ /* TODO:
+ // Reduces to count unique SHA1 and DOI
+ joinedPipe
+ .map { case (_, (grobidFeatures, _)) => grobidFeatures.sha }
+ .distinct
+ .map { _ => uniqueSha1Count.inc }
+ joinedPipe
+ .map { case (_, (_, crossrefFeatures)) => crossrefFeatures.doi }
+ .distinct
+ .map { _ => uniqueDoiCount.inc }
+ */
- def getScorable2() : Scorable = {
- scorable2 match {
- case Some(s) => s
- case None => null
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ .map { case (slug, (grobidFeatures, crossrefFeatures)) =>
+ joinedRowCount.inc
+ //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(grobidFeatures, crossrefFeatures),
+ grobidFeatures.json,
+ crossrefFeatures.json)
}
- }
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
}
- */