aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 725474d..018a74b 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -19,6 +19,7 @@ import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
+import TDsl._
class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
@@ -30,13 +31,13 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val temp : cascading.pipe.Pipe = grobidSource
.read
- .fromBytesWritable(new Fields("key", "tei_json"))
+
+ // Here I CAN call Pipe.toTypedPipe()
val grobidPipe : TypedPipe[(String, String, String)] = temp
- // .debug // Should be 4 tuples for mocked data
+ .fromBytesWritable(new Fields("key", "tei_json"))
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
- // TODO: Consider passing forward only a subset of JSON.
HBaseCrossrefScore.grobidToSlug(json) match {
case Some(slug) => (slug, key, json)
case None => (NoTitle, key, json)
@@ -46,7 +47,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val (slug, _, _) = entry
slug != NoTitle
}
-// .debug // SHould be 3 tuples for mocked data
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }