aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala19
1 files changed, 13 insertions, 6 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index bcb6156..7e10c43 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -21,6 +21,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
+ /*
// key is SHA1
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
@@ -28,7 +29,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
.fromBytesWritable(new Fields("key", "tei_json"))
- .debug
+ .debug // Should be 4 tuples for mocked data
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
@@ -41,18 +42,19 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, _, _) = entry
slug != NoTitle
}
- .debug
- .write(TypedTsv[(String, String, String)](args("output")))
-
- /*
+ .debug // SHould be 3 tuples for mocked data
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }
+ */
val crossrefSource = TextLine(args("crossref-input"))
- val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
+ val crossrefPipe : TypedPipe[String] = crossrefSource
.read
+ .debug // Should be 4 tuples for mocked data
.toTypedPipe[String]('line)
+ /*
+ .map{line : String => (line, "foo")}
.map{ json : String =>
HBaseCrossrefScore.crossrefToSlug(json) match {
case Some(slug) => (slug, json)
@@ -63,6 +65,11 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, json) = entry
slug != NoTitle
}
+ */
+ .write(TypedTsv[String](args("output")))
+
+
+ /*
val crossrefGroup = crossrefPipe
.groupBy { case (slug, json) => slug }