aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala1
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala15
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala46
-rw-r--r--scalding/src/test/scala/sandcrawler/ScorableTest.scala112
-rw-r--r--scalding/src/test/scala/sandcrawler/ScoreJobTest.scala177
6 files changed, 251 insertions, 102 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index ee4cc54..d5da845 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -11,6 +11,7 @@ import parallelai.spyglass.hbase.HBaseSource
class CrossrefScorable extends Scorable {
def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = {
+ // TODO: Generalize args so there can be multiple Grobid pipes in one job.
TextLine(args("crossref-input"))
.read
.toTypedPipe[String](new Fields("line"))
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 95d6dae..4c67074 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -11,14 +11,9 @@ import parallelai.spyglass.hbase.HBaseSource
class GrobidScorable extends Scorable with HBasePipeConversions {
def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = {
- // TODO: Clean up code after debugging.
- val grobidSource = HBaseBuilder.build(
- args("hbase-table"),
- args("zookeeper-hosts"),
- List("grobid0:tei_json"),
- SourceMode.SCAN_ALL)
-
- grobidSource.read
+ // TODO: Generalize args so there can be multiple grobid pipes in one job.
+ GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
.fromBytesWritable(new Fields("key", "tei_json"))
// TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala)
// didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json)
@@ -34,6 +29,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
}
object GrobidScorable {
+ def getHBaseSource(table : String, host : String) : HBaseSource = {
+ HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL)
+ }
+
def grobidToSlug(json : String) : Option[String] = {
Scorable.jsonToMap(json) match {
case None => None
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 86336cb..cfdc192 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -9,7 +9,7 @@ import com.twitter.scalding.typed.TDsl._
case class MapFeatures(slug : String, json : String)
case class ReduceFeatures(json : String)
-case class ReduceOutput(val score : Int, json1 : String, json2 : String)
+case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] =
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index e6a5dc1..aa20d0f 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -1,25 +1,53 @@
package sandcrawler
-import java.text.Normalizer
-
-import scala.math
-import scala.util.parsing.json.JSON
-
import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
-class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions {
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode)
+class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with
+ HBasePipeConversions {
+ /*
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(Scorable.computeSimilarity(features1, features2),
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(features1, features2),
features1.json,
features2.json)
}
.write(TypedTsv[ReduceOutput](args("output")))
+ */
+}
+
+// Ugly hack to get non-String information into ScoreJob above.
+object ScoreJob {
+ var scorable1 : Option[Scorable] = None
+ var scorable2 : Option[Scorable] = None
+
+ def setScorable1(s : Scorable) {
+ scorable1 = Some(s)
+ }
+
+ def getScorable1() : Scorable = {
+ scorable1 match {
+ case Some(s) => s
+ case None => null
+ }
+ }
+
+ def setScorable2(s: Scorable) {
+ scorable2 = Some(s)
+ }
+
+ def getScorable2() : Scorable = {
+ scorable2 match {
+ case Some(s) => s
+ case None => null
+ }
+ }
}
diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala
index 40801a0..2f80492 100644
--- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala
+++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala
@@ -9,7 +9,7 @@ import org.scalatest._
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
class ScorableTest extends FlatSpec with Matchers {
- val JsonString = """
+ val JsonString = """
{
"title": "<<TITLE>>",
"authors": [
@@ -55,96 +55,40 @@ class ScorableTest extends FlatSpec with Matchers {
}
"""
- performUnitTests()
- performPipelineTests()
-
- def performUnitTests() {
- "titleToSlug()" should "extract the parts of titles before a colon" in {
- Scorable.titleToSlug("HELLO:there") shouldBe "hello"
- }
-
- it should "extract an entire colon-less string" in {
- Scorable.titleToSlug("hello THERE") shouldBe "hello there"
- }
-
- it should "return Scorable.NoSlug if given empty string" in {
- Scorable.titleToSlug("") shouldBe Scorable.NoSlug
- }
-
- it should "return Scorable.NoSlug if given null" in {
- Scorable.titleToSlug(null) shouldBe Scorable.NoSlug
- }
-
- "titleToSlug()" should "strip punctuation" in {
- Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello"
- Scorable.titleToSlug("a:b:c") shouldBe "a"
- Scorable.titleToSlug(
- "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands"
- }
+ "titleToSlug()" should "extract the parts of titles before a colon" in {
+ Scorable.titleToSlug("HELLO:there") shouldBe "hello"
+ }
- "jsonToMap()" should "return a map, given a legal JSON string" in {
- Scorable.jsonToMap(JsonString) should not be (None)
- }
+ it should "extract an entire colon-less string" in {
+ Scorable.titleToSlug("hello THERE") shouldBe "hello there"
+ }
- it should "return None, given illegal JSON" in {
- Scorable.jsonToMap("illegal{,json{{") should be (None)
- }
+ it should "return Scorable.NoSlug if given empty string" in {
+ Scorable.titleToSlug("") shouldBe Scorable.NoSlug
+ }
- "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in {
- val score = Scorable.computeSimilarity(
- new ReduceFeatures(JsonString), new ReduceFeatures(JsonString))
- score shouldBe Scorable.MaxScore
- }
+ it should "return Scorable.NoSlug if given null" in {
+ Scorable.titleToSlug(null) shouldBe Scorable.NoSlug
}
- def performPipelineTests() {
- /*
+ "titleToSlug()" should "strip punctuation" in {
+ Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello"
+ Scorable.titleToSlug("a:b:c") shouldBe "a"
+ Scorable.titleToSlug(
+ "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands"
+ }
- val output = "/tmp/testOutput"
- val input = "/tmp/testInput"
- val (testTable, testHost) = ("test-table", "dummy-host:2181")
+ "jsonToMap()" should "return a map, given a legal JSON string" in {
+ Scorable.jsonToMap(JsonString) should not be (None)
+ }
- val grobidSampleData = List(
- List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"),
- Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))),
- List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"),
- Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))),
- List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"),
- Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))),
- List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"),
- Bytes.toBytes(MalformedGrobidString)))
+ it should "return None, given illegal JSON" in {
+ Scorable.jsonToMap("illegal{,json{{") should be (None)
+ }
- JobTest("sandcrawler.HBaseCrossrefScoreJob")
- .arg("test", "")
- .arg("app.conf.path", "app.conf")
- .arg("output", output)
- .arg("hbase-table", testTable)
- .arg("zookeeper-hosts", testHost)
- .arg("crossref-input", input)
- .arg("debug", "true")
- .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost),
- grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .source(TextLine(input), List(
- 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
- 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"),
- 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"),
- 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
- .sink[(Int, String, String, String, String)](TypedTsv[(Int,
- String, String, String, String)](output)) {
- // Grobid titles:
- // "Title 1", "Title 2: TNG", "Title 3: The Sequel"
- // crossref slugs:
- // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted"
- // Join should have 3 "Title 1" slugs and 1 "Title 2" slug
- outputBuffer =>
- "The pipeline" should "return a 4-element list" in {
- outputBuffer should have length 4
- }
- }
- .run
- .finish
-}
- */
+ "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in {
+ val score = Scorable.computeSimilarity(
+ new ReduceFeatures(JsonString), new ReduceFeatures(JsonString))
+ score shouldBe Scorable.MaxScore
}
}
-
diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
new file mode 100644
index 0000000..22cbdb8
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
@@ -0,0 +1,177 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class ScoreJobTest extends FlatSpec with Matchers {
+ val GrobidString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File")
+ val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle")
+ val MalformedGrobidString = GrobidString.replace("}", "")
+
+ val CrossrefString =
+"""
+{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" },
+ "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ],
+ "date-time" : "2017-10-23T17:19:16Z",
+ "timestamp" : { "$numberLong" : "1508779156477" } },
+ "reference-count" : 0,
+ "publisher" : "Elsevier BV",
+ "issue" : "3",
+ "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/",
+ "start" : { "date-parts" : [ [ 1996, 1, 1 ] ],
+ "date-time" : "1996-01-01T00:00:00Z",
+ "timestamp" : { "$numberLong" : "820454400000" } },
+ "delay-in-days" : 0, "content-version" : "tdm" }],
+ "content-domain" : { "domain" : [], "crossmark-restriction" : false },
+ "published-print" : { "date-parts" : [ [ 1996 ] ] },
+ "DOI" : "<<DOI>>",
+ "type" : "journal-article",
+ "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],
+ "date-time" : "2002-07-25T15:09:41Z",
+ "timestamp" : { "$numberLong" : "1027609781000" } },
+ "page" : "186-187",
+ "source" : "Crossref",
+ "is-referenced-by-count" : 0,
+ "title" : [ "<<TITLE>>" ],
+ "prefix" : "10.1016",
+ "volume" : "9",
+ "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],
+ "member" : "78",
+ "container-title" : [ "Journal de Pédiatrie et de Puériculture" ],
+ "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml",
+ "content-type" : "text/xml",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" },
+ { "URL" :
+ "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain",
+ "content-type" : "text/plain",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" } ],
+ "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ],
+ "date-time" : "2015-09-03T10:03:43Z",
+ "timestamp" : { "$numberLong" : "1441274623000" } },
+ "score" : 1,
+ "issued" : { "date-parts" : [ [ 1996 ] ] },
+ "references-count" : 0,
+ "alternative-id" : [ "0987-7983(96)87729-2" ],
+ "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2",
+ "ISSN" : [ "0987-7983" ],
+ "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ],
+ "subject" : [ "Pediatrics, Perinatology, and Child Health" ]
+}
+"""
+ val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")
+ val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
+ val MalformedCrossrefString = CrossrefString.replace("}", "")
+
+ // Pipeline tests
+ val output = "/tmp/testOutput"
+ val input = "/tmp/testInput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val grobidSampleData = List(
+ List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"),
+ Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))),
+ List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"),
+ Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))),
+ List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"),
+ Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))),
+ List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"),
+ Bytes.toBytes(MalformedGrobidString)))
+
+ // TODO: Make less yucky.
+ ScoreJob.setScorable1(new CrossrefScorable())
+ ScoreJob.setScorable2(new GrobidScorable())
+
+ JobTest("sandcrawler.ScoreJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("crossref-input", input)
+ .arg("debug", "true")
+ .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost),
+ grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .source(TextLine(input), List(
+ 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
+ 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"),
+ 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"),
+ 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
+ .sink[ReduceOutput](TypedTsv[ReduceOutput](output)) {
+ // Grobid titles:
+ // "Title 1", "Title 2: TNG", "Title 3: The Sequel"
+ // crossref slugs:
+ // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted"
+ // Join should have 3 "Title 1" slugs and 1 "Title 2" slug
+ outputBuffer =>
+ "The pipeline" should "return a 4-element list" in {
+ outputBuffer should have length 4
+ }
+
+ /*
+ it should "return the right first entry" in {
+ outputBuffer(0) shouldBe ReduceOutput("slug", 50, "",
+ "")
+ val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0)
+ slug shouldBe "title 1"
+ slug shouldBe slug0
+ slug shouldBe slug1
+ sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8")
+ grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8")
+ }
+ */
+ }
+ .run
+ .finish
+}