aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala28
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala10
2 files changed, 21 insertions, 17 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 1360af0..56eb91e 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -1,5 +1,6 @@
package sandcrawler
+import java.util.Arrays
import java.util.Properties
import scala.util.parsing.json.JSON
@@ -20,19 +21,22 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"))
- val grobidPipe = grobidSource
+ val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
- .map('tei_json -> 'slug) {
- json : ImmutableBytesWritable => {
- HBaseCrossrefScore.grobidToSlug(json.toString) match {
- case Some(slug) => slug
- case None => "nothing"
- }
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ .debug
+ .toTypedPipe[(String, String)]('key, 'tei_json)
+ .map { entry =>
+ val (key, json) = (entry._1, entry._2)
+ HBaseCrossrefScore.grobidToSlug(json) match {
+ case Some(slug) => (key, json, slug)
+ case None => (key, json, "none")
}
}
- .debug
- .map('key -> 'sha1) { sha1 : String => sha1 }
+ .write(TypedTsv[(String, String, String)](args("output")))
+/*
+ .map('key -> 'sha1) { sha1 : String => sha1 }
val crossrefSource = TextLine(args("crossref-input"))
val crossrefPipe = crossrefSource
.read
@@ -45,6 +49,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
.mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) {
x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)}
.write(TypedTsv[(String, String, String)](args("output")))
+ */
}
object HBaseCrossrefScore {
@@ -70,7 +75,6 @@ object HBaseCrossrefScore {
}
def grobidToSlug(json : String) : Option[String] = {
- throw new Exception(json)
val map = jsonToMap(json)
if (map contains "title") {
titleToSlug(map("title").asInstanceOf[String])
@@ -90,15 +94,11 @@ object HBaseCrossrefScore {
}
def titleToSlug(title : String) : Option[String] = {
- Some(title)
- /*
val slug = title.split(":")(0).toLowerCase()
- println("title: " + title + ", slug: " + slug)
if (slug.isEmpty) {
None
} else {
Some(slug)
}
- */
}
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
index f52c5b4..0d681b9 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
@@ -178,10 +178,14 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
.source(TextLine(input), List((
"0" -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
"1" -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))))
- .sink[Tuple](TypedTsv[(String, String, String)](output)) {
+ .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) {
outputBuffer =>
- it("should return a 2-element list.") {
- assert(outputBuffer.size === 2)
+ it("should return a 4-element list.") {
+ assert(outputBuffer.size === 4)
+ }
+ it("should return the right slugs.") {
+ val (sha1, json, slug) = outputBuffer(0)
+ assert(slug == "title1")
}
}
.run