aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala65
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala46
2 files changed, 68 insertions, 43 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 7b7deec..ac633e4 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -7,6 +7,8 @@ import scala.util.parsing.json.JSON
import cascading.tuple.Fields
import com.twitter.scalding._
+import com.twitter.scalding.typed.CoGrouped
+import com.twitter.scalding.typed.Grouped
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
@@ -15,6 +17,7 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
+
class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
@@ -26,36 +29,56 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
.fromBytesWritable(new Fields("key", "tei_json"))
- .debug
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
HBaseCrossrefScore.grobidToSlug(json) match {
- case Some(slug) => (key, json, slug)
- case None => (key, json, NoTitle)
+ case Some(slug) => (slug, key, json)
+ case None => (NoTitle, key, json)
}
}
.filter { entry =>
- val (_, _, slug) = entry
- slug != NoTitle && slug.length > 0
+ val (slug, _, _) = entry
+ slug != NoTitle
}
- .write(TypedTsv[(String, String, String)](args("output")))
-/*
- .map('key -> 'sha1) { sha1 : String => sha1 }
+ val grobidGroup = grobidPipe
+ .groupBy { case (slug, key, json) => slug }
+// .debug
+
+
val crossrefSource = TextLine(args("crossref-input"))
- val crossrefPipe = crossrefSource
+ val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
.read
- .map('line -> 'slug) {
- json : String => HBaseCrossrefScore.crossrefToSlug(json)}
- .debug
-
- val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe)
- innerJoinPipe
- .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) {
- x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)}
- .write(TypedTsv[(String, String, String)](args("output")))
- */
+ .toTypedPipe[String]('line)
+ .map{ json : String =>
+// val (offset, json) = entry
+ HBaseCrossrefScore.crossrefToSlug(json) match {
+ case Some(slug) => (slug, json)
+ case None => (NoTitle, json)
+ }
+ }
+ .debug
+ .filter { entry =>
+ val (slug, json) = entry
+ slug != NoTitle
+ }
+ val crossrefGroup = crossrefPipe
+ .groupBy { case (slug, json) => slug }
+
+ // TODO: Figure out which is smaller.
+ val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
+ grobidGroup.join(crossrefGroup)
+
+ theJoin.map{ entry =>
+ val (slug : String,
+ ((slug0: String, sha1 : String, grobidJson : String),
+ (slug1 : String, crossrefJson : String))) = entry
+ // TODO: For now, output it all.
+ (slug, slug0, slug1, sha1, grobidJson, crossrefJson)}
+ .write(TypedTsv[(String, String, String, String, String, String)](args("output")))
+
+
}
object HBaseCrossrefScore {
@@ -74,7 +97,7 @@ object HBaseCrossrefScore {
val jsonObject = JSON.parseFull(json)
if (jsonObject == None) {
// Empty map for malformed JSON
- Map[String, Any]("foo" -> json)
+ Map[String, Any]("malformed json" -> json)
} else {
jsonObject.get.asInstanceOf[Map[String, Any]]
}
@@ -95,7 +118,7 @@ object HBaseCrossrefScore {
// TODO: Don't ignore titles after the first.
titleToSlug(map("title").asInstanceOf[List[String]](0))
} else {
- None
+ Some(map.keys.mkString(","))
}
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
index 9402c0a..dc96003 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
@@ -8,7 +8,7 @@ import org.apache.hadoop.hbase.util.Bytes
import org.scalatest._
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
+class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
val GrobidString = """
{
"title": "<<TITLE>>",
@@ -113,7 +113,9 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")
val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
val MalformedCrossrefString = CrossrefString.replace("}", "")
-/*
+
+ // Unit tests
+
"titleToSlug()" should "extract the parts of titles before a colon" in {
val slug = HBaseCrossrefScore.titleToSlug("HELLO:there")
slug should contain ("hello")
@@ -125,7 +127,7 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
}
"grobidToSlug()" should "get the right slug for a grobid json string" in {
- val slug = HBaseCrossrefScore.grobidToSlug(GrobidString)
+ val slug = HBaseCrossrefScore.grobidToSlug(GrobidStringWithTitle)
slug should contain ("dummy example file")
}
@@ -140,8 +142,8 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
}
"crossrefToSlug()" should "get the right slug for a crossref json string" in {
- val slug = HBaseCrossrefScore.crossrefToSlug(CrossrefString)
- slug should contain ("les ferments lactiques")
+ val slug = HBaseCrossrefScore.crossrefToSlug(CrossrefStringWithTitle)
+ slug should contain ("sometitle")
}
it should "return None if given json string without title" in {
@@ -153,8 +155,9 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
val slug = HBaseCrossrefScore.grobidToSlug(MalformedCrossrefString)
slug shouldBe None
}
- */
-
+
+ // Pipeline tests
+
val output = "/tmp/testOutput"
val input = "/tmp/testInput"
val (testTable, testHost) = ("test-table", "dummy-host:2181")
@@ -176,23 +179,22 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
.source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost),
grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
.source(TextLine(input), List((
- "0" -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
- "1" -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))))
- .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) {
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"),
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"),
+ CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))))
+ .sink[(String, String, String, String, String,
+ String)](TypedTsv[(String, String, String, String, String, String)](output)) {
outputBuffer =>
- it("should return a 3-element list.") {
- assert(outputBuffer.size === 3)
- }
- it("should return the right first entry.") {
- val (sha1, json, slug0) = outputBuffer(0)
- assert(sha1 == new String(grobidSampleData(0)(0), "UTF-8"))
- assert(json == new String(grobidSampleData(0)(1), "UTF-8"))
- assert(slug0 == "title1")
- }
/*
- it("should return the right last slug.") {
- val (_, _, slug3) = outputBuffer(3)
- assert(slug3 == "foo")
+ it should "return a 3-element list" in {
+ outputBuffer should have length 3
+ }
+ it should "return the right first entry" in {
+ val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0)
+ slug shouldBe "title1"
+ sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8")
+ grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8")
}
*/
}