diff options
6 files changed, 251 insertions, 102 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index ee4cc54..d5da845 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -11,6 +11,7 @@ import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { + // TODO: Generalize args so there can be multiple Grobid pipes in one job. TextLine(args("crossref-input")) .read .toTypedPipe[String](new Fields("line")) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 95d6dae..4c67074 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -11,14 +11,9 @@ import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { - // TODO: Clean up code after debugging. - val grobidSource = HBaseBuilder.build( - args("hbase-table"), - args("zookeeper-hosts"), - List("grobid0:tei_json"), - SourceMode.SCAN_ALL) - - grobidSource.read + // TODO: Generalize args so there can be multiple grobid pipes in one job. + GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + .read .fromBytesWritable(new Fields("key", "tei_json")) // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) @@ -34,6 +29,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions { } object GrobidScorable { + def getHBaseSource(table : String, host : String) : HBaseSource = { + HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL) + } + def grobidToSlug(json : String) : Option[String] = { Scorable.jsonToMap(json) match { case None => None diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 86336cb..cfdc192 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -9,7 +9,7 @@ import com.twitter.scalding.typed.TDsl._ case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) -case class ReduceOutput(val score : Int, json1 : String, json2 : String) +case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index e6a5dc1..aa20d0f 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,25 +1,53 @@ package sandcrawler -import java.text.Normalizer - -import scala.math -import scala.util.parsing.json.JSON - import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions { - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode) +class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with + HBasePipeConversions { + /* + val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry - new ReduceOutput(Scorable.computeSimilarity(features1, features2), + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), features1.json, features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) + */ +} + +// Ugly hack to get non-String information into ScoreJob above. +object ScoreJob { + var scorable1 : Option[Scorable] = None + var scorable2 : Option[Scorable] = None + + def setScorable1(s : Scorable) { + scorable1 = Some(s) + } + + def getScorable1() : Scorable = { + scorable1 match { + case Some(s) => s + case None => null + } + } + + def setScorable2(s: Scorable) { + scorable2 = Some(s) + } + + def getScorable2() : Scorable = { + scorable2 match { + case Some(s) => s + case None => null + } + } } diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 40801a0..2f80492 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -9,7 +9,7 @@ import org.scalatest._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode class ScorableTest extends FlatSpec with Matchers { - val JsonString = """ + val JsonString = """ { "title": "<<TITLE>>", "authors": [ @@ -55,96 +55,40 @@ class ScorableTest extends FlatSpec with Matchers { } """ - performUnitTests() - performPipelineTests() - - def performUnitTests() { - "titleToSlug()" should "extract the parts of titles before a colon" in { - Scorable.titleToSlug("HELLO:there") shouldBe "hello" - } - - it should "extract an entire colon-less string" in { - Scorable.titleToSlug("hello THERE") shouldBe "hello there" - } - - it should "return Scorable.NoSlug if given empty string" in { - Scorable.titleToSlug("") shouldBe Scorable.NoSlug - } - - it should "return Scorable.NoSlug if given null" in { - Scorable.titleToSlug(null) shouldBe Scorable.NoSlug - } - - "titleToSlug()" should "strip punctuation" in { - Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" - Scorable.titleToSlug("a:b:c") shouldBe "a" - Scorable.titleToSlug( - "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" - } + "titleToSlug()" should "extract the parts of titles before a colon" in { + Scorable.titleToSlug("HELLO:there") shouldBe "hello" + } - "jsonToMap()" should "return a map, given a legal JSON string" in { - Scorable.jsonToMap(JsonString) should not be (None) - } + it should "extract an entire colon-less string" in { + Scorable.titleToSlug("hello THERE") shouldBe "hello there" + } - it should "return None, given illegal JSON" in { - Scorable.jsonToMap("illegal{,json{{") should be (None) - } + it should "return Scorable.NoSlug if given empty string" in { + Scorable.titleToSlug("") shouldBe Scorable.NoSlug + } - "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { - val score = Scorable.computeSimilarity( - new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) - score shouldBe Scorable.MaxScore - } + it should "return Scorable.NoSlug if given null" in { + Scorable.titleToSlug(null) shouldBe Scorable.NoSlug } - def performPipelineTests() { - /* + "titleToSlug()" should "strip punctuation" in { + Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" + Scorable.titleToSlug("a:b:c") shouldBe "a" + Scorable.titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" + } - val output = "/tmp/testOutput" - val input = "/tmp/testInput" - val (testTable, testHost) = ("test-table", "dummy-host:2181") + "jsonToMap()" should "return a map, given a legal JSON string" in { + Scorable.jsonToMap(JsonString) should not be (None) + } - val grobidSampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), - Bytes.toBytes(MalformedGrobidString))) + it should "return None, given illegal JSON" in { + Scorable.jsonToMap("illegal{,json{{") should be (None) + } - JobTest("sandcrawler.HBaseCrossrefScoreJob") - .arg("test", "") - .arg("app.conf.path", "app.conf") - .arg("output", output) - .arg("hbase-table", testTable) - .arg("zookeeper-hosts", testHost) - .arg("crossref-input", input) - .arg("debug", "true") - .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), - grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .source(TextLine(input), List( - 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), - 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), - 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) - .sink[(Int, String, String, String, String)](TypedTsv[(Int, - String, String, String, String)](output)) { - // Grobid titles: - // "Title 1", "Title 2: TNG", "Title 3: The Sequel" - // crossref slugs: - // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" - // Join should have 3 "Title 1" slugs and 1 "Title 2" slug - outputBuffer => - "The pipeline" should "return a 4-element list" in { - outputBuffer should have length 4 - } - } - .run - .finish -} - */ + "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { + val score = Scorable.computeSimilarity( + new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) + score shouldBe Scorable.MaxScore } } - diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala new file mode 100644 index 0000000..22cbdb8 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -0,0 +1,177 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreJobTest extends FlatSpec with Matchers { + val GrobidString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") + val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") + val MalformedGrobidString = GrobidString.replace("}", "") + + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val grobidSampleData = List( + List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), + List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), + List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), + List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), + Bytes.toBytes(MalformedGrobidString))) + + // TODO: Make less yucky. + ScoreJob.setScorable1(new CrossrefScorable()) + ScoreJob.setScorable2(new GrobidScorable()) + + JobTest("sandcrawler.ScoreJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("crossref-input", input) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), + grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .source(TextLine(input), List( + 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), + 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), + 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) + .sink[ReduceOutput](TypedTsv[ReduceOutput](output)) { + // Grobid titles: + // "Title 1", "Title 2: TNG", "Title 3: The Sequel" + // crossref slugs: + // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" + // Join should have 3 "Title 1" slugs and 1 "Title 2" slug + outputBuffer => + "The pipeline" should "return a 4-element list" in { + outputBuffer should have length 4 + } + + /* + it should "return the right first entry" in { + outputBuffer(0) shouldBe ReduceOutput("slug", 50, "", + "") + val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) + slug shouldBe "title 1" + slug shouldBe slug0 + slug shouldBe slug1 + sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") + grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") + } + */ + } + .run + .finish +} |