diff options
Diffstat (limited to 'scalding/src')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreInsertable.scala | 86 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala | 262 | 
2 files changed, 348 insertions, 0 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala new file mode 100644 index 0000000..58007fa --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala @@ -0,0 +1,86 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class ScoreInsertableJob(args: Args) extends JobBase(args) { + +  val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") +  val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") +  val cdxRowCount = Stat("cdx-rows", "sandcrawler") +  val scoredRowCount = Stat("scored-rows", "sandcrawler") +  val joinedRowCount = Stat("joined-rows", "sandcrawler") + +  val grobidScorable : Scorable = new GrobidScorable() +  val crossrefScorable : Scorable = new CrossrefScorable() + +  val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable +    .getInputPipe(args) +    .map { r => +      grobidRowCount.inc +      r +    } +  val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable +    .getInputPipe(args) +    .map { r => +      crossrefRowCount.inc +      r +    } +  val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts")) +    .read +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) +    .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } +    .map { case (key, cdx, mime, size) => +      (Bytes.toString(key.copyBytes()), +       Bytes.toString(cdx.copyBytes()), +       Bytes.toString(mime.copyBytes()), +       Bytes.toLong(size.copyBytes())) +    } +    .map { r => +      cdxRowCount.inc +      r +    } + +  val scoredPipe = grobidPipe +    .addTrap(TypedTsv(args("output") + ".trapped")) +    .join(crossrefPipe) +    .map { case (slug, (grobidFeatures, crossrefFeatures)) => +      scoredRowCount.inc +      //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry +      // Not ever Empty, I promise +      val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull +      (key, new ReduceOutput( +        slug, +        Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), +        grobidFeatures.json, +        crossrefFeatures.json)) +    } +    .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) } +    .groupBy { case (key, _, _, _, _) => key } + +  // TypedTsv doesn't work over case classes. +  val joinedPipe = scoredPipe +    .join(cdxPipe.groupBy { case (key, _, _, _) => key }) +    .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) } +    .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output"))) +} + +object ScoreInsertableJob { + +  // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" +  def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { +    HBaseBuilder.build( +      hbaseTable, +      zookeeperHosts, +      List("file:cdx", "file:mime", "file:size"), +      SourceMode.SCAN_ALL) +  } +} diff --git a/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala new file mode 100644 index 0000000..5393f10 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala @@ -0,0 +1,262 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreInsertableJobTest extends FlatSpec with Matchers { +  //scalastyle:off +  val JsonString = """ +{ +  "title": "<<TITLE>>", +  "authors": [ +    {"name": "Brewster Kahle"}, +    {"name": "J Doe"} +  ], +  "journal": { +    "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", +    "eissn": null, +    "issn": null, +    "issue": null, +    "publisher": null, +    "volume": null +  }, +  "date": "2000", +  "doi": null, +  "citations": [ +    { "authors": [{"name": "A Seaperson"}], +      "date": "2001", +      "id": "b0", +      "index": 0, +      "issue": null, +      "journal": "Letters in the Alphabet", +      "publisher": null, +      "title": "Everything is Wonderful", +      "url": null, +      "volume": "20"}, +    { "authors": [], +      "date": "2011-03-28", +      "id": "b1", +      "index": 1, +      "issue": null, +      "journal": "The Dictionary", +      "publisher": null, +      "title": "All about Facts", +      "url": null, +      "volume": "14"} +  ], +  "abstract": "Everything you ever wanted to know about nothing", +  "body": "Introduction \nEverything starts somewhere, as somebody [1]  once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", +  "acknowledgement": null, +  "annex": null +} +""" +  // scalastyle:on +  val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") +  val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") +  val MalformedJsonString = JsonString.replace("}", "") + +  // scalastyle:off +  val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, +  "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], +    "date-time" : "2017-10-23T17:19:16Z", +    "timestamp" : { "$numberLong" : "1508779156477" } }, +  "reference-count" : 0, +  "publisher" : "Elsevier BV", +  "issue" : "3", +  "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", +                    "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], +                                "date-time" : "1996-01-01T00:00:00Z", +                                "timestamp" : { "$numberLong" : "820454400000" } }, +                                "delay-in-days" : 0, "content-version" : "tdm" }], +  "content-domain" : { "domain" : [], "crossmark-restriction" : false }, +  "published-print" : { "date-parts" : [ [ 1996 ] ] }, +  "DOI" : "<<DOI>>", +  "type" : "journal-article", +  "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], +    "date-time" : "2002-07-25T15:09:41Z", +    "timestamp" : { "$numberLong" : "1027609781000" } }, +  "page" : "186-187", +  "source" : "Crossref", +  "is-referenced-by-count" : 0, +  "title" : [ "<<TITLE>>" ], +  "prefix" : "10.1016", +  "volume" : "9", +  "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], +  "member" : "78", +  "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], +  "link" : [ { "URL" :  "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", +               "content-type" : "text/xml", +               "content-version" : "vor", +               "intended-application" : "text-mining" }, +               { "URL" : +  "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", +                 "content-type" : "text/plain", +                 "content-version" : "vor", +                 "intended-application" : "text-mining" } ], +  "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], +                  "date-time" : "2015-09-03T10:03:43Z", +                  "timestamp" : { "$numberLong" : "1441274623000" } }, +  "score" : 1, +  "issued" : { "date-parts" : [ [ 1996 ] ] }, +  "references-count" : 0, +  "alternative-id" : [ "0987-7983(96)87729-2" ], +  "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", +  "ISSN" : [ "0987-7983" ], +  "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], +  "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" +  // scalastyle:on +  val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y"  // arbitrary long string +  val TooShortOfTitle = "X" * (ScorableFeatures.MinSlugLength - 1) +  val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") +  val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") +  val MalformedCrossrefString = CrossrefString.replace("}", "") +  val CrossrefStrings = List( +    CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"), +    CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), +    CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), +    CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"), +    CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1"), +    CrossrefString.replace("<<TITLE>>", TooShortOfTitle).replace("<<DOI>>", "DOI-1")) + +  //  Pipeline tests +  val output = "/tmp/testOutput" +  val input = "/tmp/testInput" +  val (testTable, testHost) = ("test-table", "dummy-host:2181") + +  val Sha1Strings : List[String] = List( +    "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", +    "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", +    "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", +    "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", +    "sha1:93187A85273589347598473894839443", +    "sha1:024937534094897039547e9824382943", +    "sha1:93229759932857982837892347893892", +    "sha1:83229759932857982837892347893892") + +  val JsonStrings : List[String] = List( +    JsonString.replace("<<TITLE>>", "Title 1: The Original"), +    JsonString.replace("<<TITLE>>", "Title 2: TNG"), +    JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), +    // This will have bad status. +    JsonString.replace("<<TITLE>>", "Title 1: The Original"), +    MalformedJsonString, +    // This will have bad status. +    JsonString.replace("<<TITLE>>", "Title 2: Not TNG"), +    // These are in both sources but have bad titles +    JsonString.replace("<<TITLE>>", TooLongOfTitle), +    JsonString.replace("<<TITLE>>", TooShortOfTitle) +  ) + +  // bnewbold: status codes aren't strings, they are uint64 +  val Ok : Long = 200 +  val Bad : Long = 400 +  val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok, Ok) + +  val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) +    .zipped +    .toList +    .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } +    .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + +  // scalastyle:off null +  // Add example of lines without GROBID data +  // scalastyle:off null +  val SampleData = SampleDataHead :+ new Tuple( +    new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) +  // scalastyle:on null + +  val CdxList: List[String] = List("{}", "{}",  "{}",  "{}",  "{}",  "{}",  "{}",  "{}" ) +  val MimeList: List[String] = List("application/pdf", "application/pdf", "application/pdf", +    "application/pdf", "application/pdf", "application/pdf", "application/pdf", +    "application/pdf") +  val SizeList: List[Long] = List(1,2,3,4,5,6,7,8) + +  // Can zip 3 lists, but not 4... so we recursively zip +  val SampleCdxData : List[Tuple] = ((Sha1Strings, CdxList).zipped.toList, (MimeList, SizeList).zipped.toList) +    .zipped +    .toList +    .map { case ((sha: String, cdx: String), (mime: String, size: Long)) => List(Bytes.toBytes(sha), Bytes.toBytes(cdx), Bytes.toBytes(mime), Bytes.toBytes(size)) } +    .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + +  JobTest("sandcrawler.ScoreInsertableJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("crossref-input", input) +    .arg("debug", "true") +    .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) +    .source[Tuple](ScoreInsertableJob.getHBaseCdxSource(testTable, testHost), SampleCdxData) +    .source(TextLine(input), List( +      0 -> CrossrefStrings(0), +      1 -> CrossrefStrings(1), +      2 -> CrossrefStrings(2), +      3 -> CrossrefStrings(3), +      4 -> CrossrefStrings(4), +      4 -> CrossrefStrings(5))) +    .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () } +    .sink[(String, String, Int, String, String, String, String, Long)](TypedTsv[(String, String, Int, String, String, String, String, Long)](output)) { +      // Grobid titles and slugs (in parentheses): +      //   Title 1                       (title1) +      //   Title 2: TNG                  (title2tng) +      //   Title 3: The Sequel           (title3thesequel) +      //   <too long of a title> +      //   <too short of a title> +      // crossref titles and slugs (in parentheses): +      //   Title 2: TNG                  (title2tng) +      //   Title 1: TNG 2A               (title1tng2a) +      //   Title 1: TNG 3                (title1tng3) +      //   Title 2: Rebooted             (title2rebooted) +      //   <too long of a title> +      //   <too short of a title> +      // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug +      outputBuffer => +      "The pipeline" should "return a 1-element list" in { +        outputBuffer should have length 1 +      } + +      it should "has right # of entries with each slug" in { +        val slugs = outputBuffer.map(_._2) +        val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) +        // XXX: countMap("title1") shouldBe 3 +        countMap("title2tng") shouldBe 1 +      } + +      def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = { +        val mfg : Option[MapFeatures] = GrobidScorable.jsonToMapFeatures( +          Sha1Strings(grobidIndex), +          JsonStrings(grobidIndex)) +        val mfc : Option[MapFeatures] = CrossrefScorable.jsonToMapFeatures(CrossrefStrings(crossrefIndex)) +        if (mfg.isEmpty || mfc.isEmpty) { +          fail() +        } else { +          val score = Scorable.computeSimilarity( +            ReduceFeatures(mfg.get.json), +            ReduceFeatures(mfc.get.json)) +          (slug, score, mfg.get.json, mfc.get.json) +        } +      } + +      it should "have right output values" in { +        //outputBuffer.exists(_ == bundle("title1", 0, 0)) +        //outputBuffer.exists(_ == bundle("title1", 0, 2)) +        //outputBuffer.exists(_ == bundle("title1", 0, 1)) +        outputBuffer.exists(_ == bundle("title2tng", 1, 3)) +      } +    } +    .run +    .finish +} | 
