aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala12
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala9
2 files changed, 10 insertions, 11 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index ac633e4..bcb6156 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -17,7 +17,6 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
-
class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
@@ -29,6 +28,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
.fromBytesWritable(new Fields("key", "tei_json"))
+ .debug
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
@@ -41,24 +41,24 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, _, _) = entry
slug != NoTitle
}
+ .debug
+ .write(TypedTsv[(String, String, String)](args("output")))
+
+ /*
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }
-// .debug
-
val crossrefSource = TextLine(args("crossref-input"))
val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
.read
.toTypedPipe[String]('line)
.map{ json : String =>
-// val (offset, json) = entry
HBaseCrossrefScore.crossrefToSlug(json) match {
case Some(slug) => (slug, json)
case None => (NoTitle, json)
}
}
- .debug
.filter { entry =>
val (slug, json) = entry
slug != NoTitle
@@ -77,7 +77,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
// TODO: For now, output it all.
(slug, slug0, slug1, sha1, grobidJson, crossrefJson)}
.write(TypedTsv[(String, String, String, String, String, String)](args("output")))
-
+ */
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
index dc96003..96c7770 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
@@ -178,18 +178,17 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
.arg("debug", "true")
.source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost),
grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .source(TextLine(input), List((
+ .source(TextLine(input), List(
CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"),
CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"),
- CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))))
- .sink[(String, String, String, String, String,
- String)](TypedTsv[(String, String, String, String, String, String)](output)) {
+ CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
+ .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) {
outputBuffer =>
- /*
it should "return a 3-element list" in {
outputBuffer should have length 3
}
+ /*
it should "return the right first entry" in {
val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0)
slug shouldBe "title1"