aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-07-25 20:45:42 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-07-25 20:45:42 -0700
commit15ae7006cd8238bb9453f27be6aa5388a6002ce8 (patch)
tree441049a09c0118cb564749ffd3099a898a165d94 /scalding
parent0f0152189cf6df0f4b56d92149a60e902eb20be6 (diff)
downloadsandcrawler-15ae7006cd8238bb9453f27be6aa5388a6002ce8.tar.gz
sandcrawler-15ae7006cd8238bb9453f27be6aa5388a6002ce8.zip
Made progress on crossrefPipe.
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala19
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala12
2 files changed, 19 insertions, 12 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index bcb6156..7e10c43 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -21,6 +21,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
+ /*
// key is SHA1
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
@@ -28,7 +29,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
.fromBytesWritable(new Fields("key", "tei_json"))
- .debug
+ .debug // Should be 4 tuples for mocked data
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
@@ -41,18 +42,19 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, _, _) = entry
slug != NoTitle
}
- .debug
- .write(TypedTsv[(String, String, String)](args("output")))
-
- /*
+ .debug // SHould be 3 tuples for mocked data
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }
+ */
val crossrefSource = TextLine(args("crossref-input"))
- val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
+ val crossrefPipe : TypedPipe[String] = crossrefSource
.read
+ .debug // Should be 4 tuples for mocked data
.toTypedPipe[String]('line)
+ /*
+ .map{line : String => (line, "foo")}
.map{ json : String =>
HBaseCrossrefScore.crossrefToSlug(json) match {
case Some(slug) => (slug, json)
@@ -63,6 +65,11 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, json) = entry
slug != NoTitle
}
+ */
+ .write(TypedTsv[String](args("output")))
+
+
+ /*
val crossrefGroup = crossrefPipe
.groupBy { case (slug, json) => slug }
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
index 96c7770..bd9dcd3 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
@@ -179,13 +179,13 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
.source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost),
grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
.source(TextLine(input), List(
- CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
- CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"),
- CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"),
- CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
- .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) {
+ 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
+ 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"),
+ 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"),
+ 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
+ .sink[String](TypedTsv[String](output)) {
outputBuffer =>
- it should "return a 3-element list" in {
+ it should "return a 4-element list" in {
outputBuffer should have length 3
}
/*