diff options
Diffstat (limited to 'scalding/src')
6 files changed, 251 insertions, 102 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index ee4cc54..d5da845 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -11,6 +11,7 @@ import parallelai.spyglass.hbase.HBaseSource  class CrossrefScorable extends Scorable {    def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { +    // TODO: Generalize args so there can be multiple Grobid pipes in one job.      TextLine(args("crossref-input"))        .read        .toTypedPipe[String](new Fields("line")) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 95d6dae..4c67074 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -11,14 +11,9 @@ import parallelai.spyglass.hbase.HBaseSource  class GrobidScorable extends Scorable with HBasePipeConversions {    def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { -    // TODO: Clean up code after debugging. -    val grobidSource = HBaseBuilder.build( -      args("hbase-table"), -      args("zookeeper-hosts"), -      List("grobid0:tei_json"), -      SourceMode.SCAN_ALL) - -    grobidSource.read +    // TODO: Generalize args so there can be multiple grobid pipes in one job. +    GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +      .read        .fromBytesWritable(new Fields("key", "tei_json"))      // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala)      // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) @@ -34,6 +29,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {  }  object GrobidScorable { +  def getHBaseSource(table : String, host : String) : HBaseSource = { +    HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL) +  } +    def grobidToSlug(json : String) : Option[String] = {      Scorable.jsonToMap(json) match {        case None => None diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 86336cb..cfdc192 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -9,7 +9,7 @@ import com.twitter.scalding.typed.TDsl._  case class MapFeatures(slug : String, json : String)  case class ReduceFeatures(json : String) -case class ReduceOutput(val score : Int, json1 : String, json2 : String) +case class ReduceOutput(val slug : String,  score : Int, json1 : String, json2 : String)  abstract class Scorable {    def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index e6a5dc1..aa20d0f 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,25 +1,53 @@  package sandcrawler -import java.text.Normalizer - -import scala.math -import scala.util.parsing.json.JSON -  import cascading.flow.FlowDef  import com.twitter.scalding._  import com.twitter.scalding.typed.TDsl._  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions { -  val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode) -  val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode) +class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with +    HBasePipeConversions { +  /* +  val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode) +  val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode)    pipe1.join(pipe2).map { entry =>      val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry -    new ReduceOutput(Scorable.computeSimilarity(features1, features2), +    new ReduceOutput( +      slug, +      Scorable.computeSimilarity(features1, features2),        features1.json,        features2.json)    }      .write(TypedTsv[ReduceOutput](args("output"))) +   */ +} + +// Ugly hack to get non-String information into ScoreJob above. +object ScoreJob { +  var scorable1 : Option[Scorable] = None +  var scorable2 : Option[Scorable] = None + +  def setScorable1(s : Scorable) { +    scorable1 = Some(s) +  } + +  def getScorable1() : Scorable = { +    scorable1  match { +      case Some(s) => s +      case None => null +    } +  } + +  def setScorable2(s: Scorable) { +    scorable2 = Some(s) +  } + +  def getScorable2() : Scorable = { +    scorable2 match { +      case Some(s) => s +      case None => null +    } +  }  } diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 40801a0..2f80492 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -9,7 +9,7 @@ import org.scalatest._  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  class ScorableTest extends FlatSpec with Matchers { -      val JsonString = """ +  val JsonString = """  {    "title": "<<TITLE>>",    "authors": [ @@ -55,96 +55,40 @@ class ScorableTest extends FlatSpec with Matchers {  }  """ -  performUnitTests() -  performPipelineTests() - -  def performUnitTests() { -    "titleToSlug()" should "extract the parts of titles before a colon" in { -      Scorable.titleToSlug("HELLO:there") shouldBe "hello" -    } - -    it should "extract an entire colon-less string" in { -      Scorable.titleToSlug("hello THERE") shouldBe "hello there" -    } - -    it should "return Scorable.NoSlug if given empty string" in { -      Scorable.titleToSlug("") shouldBe Scorable.NoSlug -    } - -    it should "return Scorable.NoSlug if given null" in { -      Scorable.titleToSlug(null) shouldBe Scorable.NoSlug -    } - -    "titleToSlug()" should "strip punctuation" in { -      Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" -      Scorable.titleToSlug("a:b:c") shouldBe "a" -      Scorable.titleToSlug( -        "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" -    } +  "titleToSlug()" should "extract the parts of titles before a colon" in { +    Scorable.titleToSlug("HELLO:there") shouldBe "hello" +  } -    "jsonToMap()" should "return a map, given a legal JSON string" in { -      Scorable.jsonToMap(JsonString) should not be (None) -    } +  it should "extract an entire colon-less string" in { +    Scorable.titleToSlug("hello THERE") shouldBe "hello there" +  } -    it should "return None, given illegal JSON" in { -      Scorable.jsonToMap("illegal{,json{{") should be (None) -    } +  it should "return Scorable.NoSlug if given empty string" in { +    Scorable.titleToSlug("") shouldBe Scorable.NoSlug +  } -    "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { -      val score = Scorable.computeSimilarity( -        new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) -      score shouldBe Scorable.MaxScore -    } +  it should "return Scorable.NoSlug if given null" in { +    Scorable.titleToSlug(null) shouldBe Scorable.NoSlug    } -  def performPipelineTests() { -      /* +  "titleToSlug()" should "strip punctuation" in { +    Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" +    Scorable.titleToSlug("a:b:c") shouldBe "a" +    Scorable.titleToSlug( +      "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" +  } -    val output = "/tmp/testOutput" -    val input = "/tmp/testInput" -    val (testTable, testHost) = ("test-table", "dummy-host:2181") +  "jsonToMap()" should "return a map, given a legal JSON string" in { +    Scorable.jsonToMap(JsonString) should not be (None) +  } -  val grobidSampleData = List( -    List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), -      Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), -    List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), -      Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), -    List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), -      Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), -    List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"),  -      Bytes.toBytes(MalformedGrobidString))) +  it should "return None, given illegal JSON" in { +    Scorable.jsonToMap("illegal{,json{{") should be (None) +  } -  JobTest("sandcrawler.HBaseCrossrefScoreJob") -    .arg("test", "") -    .arg("app.conf.path", "app.conf") -    .arg("output", output) -    .arg("hbase-table", testTable) -    .arg("zookeeper-hosts", testHost) -    .arg("crossref-input", input) -    .arg("debug", "true") -    .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), -      grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) -    .source(TextLine(input), List( -      0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), -      1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), -      2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), -      3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) -    .sink[(Int, String, String, String, String)](TypedTsv[(Int, -    String, String, String, String)](output)) { -      // Grobid titles:  -      //   "Title 1", "Title 2: TNG", "Title 3: The Sequel" -      // crossref slugs:  -      //   "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" -      // Join should have 3 "Title  1" slugs and 1 "Title 2" slug -      outputBuffer => -      "The pipeline" should "return a 4-element list" in { -        outputBuffer should have length 4 -      } -    } -    .run -    .finish -} -       */ +  "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { +    val score = Scorable.computeSimilarity( +      new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) +    score shouldBe Scorable.MaxScore    }  } -   diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala new file mode 100644 index 0000000..22cbdb8 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -0,0 +1,177 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreJobTest extends FlatSpec with Matchers { +  val GrobidString = """ +{ +  "title": "<<TITLE>>", +  "authors": [ +    {"name": "Brewster Kahle"}, +    {"name": "J Doe"} +  ], +  "journal": { +    "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", +    "eissn": null, +    "issn": null, +    "issue": null, +    "publisher": null, +    "volume": null +  }, +  "date": "2000", +  "doi": null, +  "citations": [ +    { "authors": [{"name": "A Seaperson"}], +      "date": "2001", +      "id": "b0", +      "index": 0, +      "issue": null, +      "journal": "Letters in the Alphabet", +      "publisher": null, +      "title": "Everything is Wonderful", +      "url": null, +      "volume": "20"}, +    { "authors": [], +      "date": "2011-03-28", +      "id": "b1", +      "index": 1, +      "issue": null, +      "journal": "The Dictionary", +      "publisher": null, +      "title": "All about Facts", +      "url": null, +      "volume": "14"} +  ], +  "abstract": "Everything you ever wanted to know about nothing", +  "body": "Introduction \nEverything starts somewhere, as somebody [1]  once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", +  "acknowledgement": null, +  "annex": null +} +""" +  val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") +  val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") +  val MalformedGrobidString = GrobidString.replace("}", "") + +  val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" },  +  "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ],  +    "date-time" : "2017-10-23T17:19:16Z",  +    "timestamp" : { "$numberLong" : "1508779156477" } },  +  "reference-count" : 0,  +  "publisher" : "Elsevier BV",  +  "issue" : "3",  +  "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/",  +                    "start" : { "date-parts" : [ [ 1996, 1, 1 ] ],  +                                "date-time" : "1996-01-01T00:00:00Z",  +                                "timestamp" : { "$numberLong" : "820454400000" } },  +                                "delay-in-days" : 0, "content-version" : "tdm" }], +  "content-domain" : { "domain" : [], "crossmark-restriction" : false },  +  "published-print" : { "date-parts" : [ [ 1996 ] ] },  +  "DOI" : "<<DOI>>", +  "type" : "journal-article",  +  "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],  +    "date-time" : "2002-07-25T15:09:41Z",  +    "timestamp" : { "$numberLong" : "1027609781000" } },  +  "page" : "186-187",  +  "source" : "Crossref",  +  "is-referenced-by-count" : 0,  +  "title" : [ "<<TITLE>>" ], +  "prefix" : "10.1016",  +  "volume" : "9",  +  "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],  +  "member" : "78",  +  "container-title" : [ "Journal de Pédiatrie et de Puériculture" ],  +  "link" : [ { "URL" :  "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", +               "content-type" : "text/xml",  +                 "content-version" : "vor", +                 "intended-application" : "text-mining" },  +               { "URL" : +  "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", +                 "content-type" : "text/plain",  +                 "content-version" : "vor", +                 "intended-application" : "text-mining" } ],  +  "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ],  +                  "date-time" : "2015-09-03T10:03:43Z",  +                  "timestamp" : { "$numberLong" : "1441274623000" } },  +  "score" : 1,  +  "issued" : { "date-parts" : [ [ 1996 ] ] },  +  "references-count" : 0,  +  "alternative-id" : [ "0987-7983(96)87729-2" ],  +  "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2",  +  "ISSN" : [ "0987-7983" ],  +  "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ],  +  "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" +  val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") +  val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") +  val MalformedCrossrefString = CrossrefString.replace("}", "") + +  //  Pipeline tests +  val output = "/tmp/testOutput" +  val input = "/tmp/testInput" +  val (testTable, testHost) = ("test-table", "dummy-host:2181") + +  val grobidSampleData = List( +    List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), +      Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), +    List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), +      Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), +    List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), +      Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), +    List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"),  +      Bytes.toBytes(MalformedGrobidString))) + +  // TODO: Make less yucky. +  ScoreJob.setScorable1(new CrossrefScorable()) +  ScoreJob.setScorable2(new GrobidScorable()) + +  JobTest("sandcrawler.ScoreJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("crossref-input", input) +    .arg("debug", "true") +    .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), +      grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) +    .source(TextLine(input), List( +      0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), +      1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), +      2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), +      3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) +    .sink[ReduceOutput](TypedTsv[ReduceOutput](output)) { +      // Grobid titles:  +      //   "Title 1", "Title 2: TNG", "Title 3: The Sequel" +      // crossref slugs:  +      //   "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" +      // Join should have 3 "Title  1" slugs and 1 "Title 2" slug +      outputBuffer =>  +      "The pipeline" should "return a 4-element list" in { +        outputBuffer should have length 4 +      } + +              /* +      it should "return the right first entry" in { +        outputBuffer(0) shouldBe ReduceOutput("slug", 50, "", +          "") +        val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) +        slug shouldBe "title 1" +        slug shouldBe slug0 +        slug shouldBe slug1 +        sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") +        grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") +      } +        */ +    } +    .run +    .finish +}  | 
