aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala58
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala49
2 files changed, 84 insertions, 23 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 12660e8..1360af0 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -8,75 +8,97 @@ import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
// key is SHA1
- val grobidSource = HBaseBuilder.build(
- args("grobid-table"),
- args("zookeeper-hosts"),
- List("grobid0:tei_json"),
- sourceMode = SourceMode.SCAN_ALL)
-
+ val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
val grobidPipe = grobidSource
.read
.map('tei_json -> 'slug) {
- json : String => HBaseCrossrefScore.grobidToSlug(json)}
+ json : ImmutableBytesWritable => {
+ HBaseCrossrefScore.grobidToSlug(json.toString) match {
+ case Some(slug) => slug
+ case None => "nothing"
+ }
+ }
+ }
+ .debug
+ .map('key -> 'sha1) { sha1 : String => sha1 }
- val crossrefSource = TextLine(args("input"))
+ val crossrefSource = TextLine(args("crossref-input"))
val crossrefPipe = crossrefSource
.read
.map('line -> 'slug) {
json : String => HBaseCrossrefScore.crossrefToSlug(json)}
-
-/*
- statusPipe.groupBy { identity }
- .size
.debug
- .write(TypedTsv[(Long,Long)](args("output")))
- */
+
+ val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe)
+ innerJoinPipe
+ .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) {
+ x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)}
+ .write(TypedTsv[(String, String, String)](args("output")))
}
object HBaseCrossrefScore {
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build(
+ hbaseTable, // HBase Table Name
+ zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?)
+ List("grobid0:tei_json"),
+ SourceMode.SCAN_ALL)
+
+ def performJoin(grobidJson : String, crossRefJson : String, sha1 : String) : (String, String, String) = {
+ (sha1, "1.2.3.4", "100")
+ }
+
def jsonToMap(json : String) : Map[String, Any] = {
// https://stackoverflow.com/a/32717262/631051
val jsonObject = JSON.parseFull(json)
if (jsonObject == None) {
// Empty map for malformed JSON
- Map[String, Any]()
+ Map[String, Any]("foo" -> json)
} else {
jsonObject.get.asInstanceOf[Map[String, Any]]
}
}
-
def grobidToSlug(json : String) : Option[String] = {
+ throw new Exception(json)
val map = jsonToMap(json)
if (map contains "title") {
titleToSlug(map("title").asInstanceOf[String])
} else {
- None
+ Some("grobidToSlug None: " + map("foo"))
}
}
def crossrefToSlug(json : String) : Option[String] = {
val map = jsonToMap(json)
if (map contains "title") {
+ // TODO: Don't ignore titles after the first.
titleToSlug(map("title").asInstanceOf[List[String]](0))
} else {
- None
+ Some("crossRefToSlug None")
}
}
def titleToSlug(title : String) : Option[String] = {
+ Some(title)
+ /*
val slug = title.split(":")(0).toLowerCase()
+ println("title: " + title + ", slug: " + slug)
if (slug.isEmpty) {
None
} else {
Some(slug)
}
+ */
}
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
index a59b278..f52c5b4 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
@@ -1,13 +1,17 @@
package sandcrawler
import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
import org.scalatest._
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
+class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {
val GrobidString = """
{
- "title": "Dummy Example File",
+ "title": "<<TITLE>>",
"authors": [
{"name": "Brewster Kahle"},
{"name": "J Doe"}
@@ -50,6 +54,7 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
"annex": null
}
"""
+ val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File")
val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle")
val MalformedGrobidString = GrobidString.replace("}", "")
@@ -69,7 +74,7 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
"delay-in-days" : 0, "content-version" : "tdm" }],
"content-domain" : { "domain" : [], "crossmark-restriction" : false },
"published-print" : { "date-parts" : [ [ 1996 ] ] },
- "DOI" : "10.1016/0987-7983(96)87729-2",
+ "DOI" : "<<DOI>>",
"type" : "journal-article",
"created" : { "date-parts" : [ [ 2002, 7, 25 ] ],
"date-time" : "2002-07-25T15:09:41Z",
@@ -77,7 +82,7 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
"page" : "186-187",
"source" : "Crossref",
"is-referenced-by-count" : 0,
- "title" : [ "les ferments lactiques: classification, propriétés, utilisations agroalimentaires" ],
+ "title" : [ "<<TITLE>>" ],
"prefix" : "10.1016",
"volume" : "9",
"author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],
@@ -105,9 +110,10 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
"subject" : [ "Pediatrics, Perinatology, and Child Health" ]
}
"""
+ val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")
val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
val MalformedCrossrefString = CrossrefString.replace("}", "")
-
+/*
"titleToSlug()" should "extract the parts of titles before a colon" in {
val slug = HBaseCrossrefScore.titleToSlug("HELLO:there")
slug should contain ("hello")
@@ -147,4 +153,37 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
val slug = HBaseCrossrefScore.grobidToSlug(MalformedCrossrefString)
slug shouldBe None
}
+ */
+
+ val output = "/tmp/testOutput"
+ val input = "/tmp/testInput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val grobidSampleData = List(
+ List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title1"))),
+ List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title2: TNG"))),
+ List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title3: The Sequel"))),
+ List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title4"))))
+
+ JobTest("sandcrawler.HBaseCrossrefScoreJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("crossref-input", input)
+ .arg("debug", "true")
+ .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost),
+ grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .source(TextLine(input), List((
+ "0" -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
+ "1" -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))))
+ .sink[Tuple](TypedTsv[(String, String, String)](output)) {
+ outputBuffer =>
+ it("should return a 2-element list.") {
+ assert(outputBuffer.size === 2)
+ }
+ }
+ .run
+ .finish
}