From e5db9bc783c1157984c160d155c44d38b84b57ce Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 12 Sep 2018 15:27:08 -0700 Subject: hack scorejob variant with extra context joined in This is to allow pushes to fatcat. There is probably a better way... --- .../main/scala/sandcrawler/ScoreInsertable.scala | 86 +++++++ .../scala/sandcrawler/ScoreInsertableJobTest.scala | 262 +++++++++++++++++++++ 2 files changed, 348 insertions(+) create mode 100644 scalding/src/main/scala/sandcrawler/ScoreInsertable.scala create mode 100644 scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala (limited to 'scalding') diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala new file mode 100644 index 0000000..58007fa --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala @@ -0,0 +1,86 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class ScoreInsertableJob(args: Args) extends JobBase(args) { + + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val cdxRowCount = Stat("cdx-rows", "sandcrawler") + val scoredRowCount = Stat("scored-rows", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts")) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) + .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } + .map { case (key, cdx, mime, size) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes())) + } + .map { r => + cdxRowCount.inc + r + } + + val scoredPipe = grobidPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(crossrefPipe) + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + scoredRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry + // Not ever Empty, I promise + val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull + (key, new ReduceOutput( + slug, + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json)) + } + .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) } + .groupBy { case (key, _, _, _, _) => key } + + // TypedTsv doesn't work over case classes. + val joinedPipe = scoredPipe + .join(cdxPipe.groupBy { case (key, _, _, _) => key }) + .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) } + .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output"))) +} + +object ScoreInsertableJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("file:cdx", "file:mime", "file:size"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala new file mode 100644 index 0000000..5393f10 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala @@ -0,0 +1,262 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreInsertableJobTest extends FlatSpec with Matchers { + //scalastyle:off + val JsonString = """ +{ + "title": "<>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + // scalastyle:on + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") + + // scalastyle:off + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + // scalastyle:on + val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y" // arbitrary long string + val TooShortOfTitle = "X" * (ScorableFeatures.MinSlugLength - 1) + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStrings = List( + CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooShortOfTitle).replace("<<DOI>>", "DOI-1")) + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val Sha1Strings : List[String] = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", + "sha1:93187A85273589347598473894839443", + "sha1:024937534094897039547e9824382943", + "sha1:93229759932857982837892347893892", + "sha1:83229759932857982837892347893892") + + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1: The Original"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1: The Original"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2: Not TNG"), + // These are in both sources but have bad titles + JsonString.replace("<<TITLE>>", TooLongOfTitle), + JsonString.replace("<<TITLE>>", TooShortOfTitle) + ) + + // bnewbold: status codes aren't strings, they are uint64 + val Ok : Long = 200 + val Bad : Long = 400 + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok, Ok) + + val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) + .zipped + .toList + .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + // scalastyle:off null + // Add example of lines without GROBID data + // scalastyle:off null + val SampleData = SampleDataHead :+ new Tuple( + new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) + // scalastyle:on null + + val CdxList: List[String] = List("{}", "{}", "{}", "{}", "{}", "{}", "{}", "{}" ) + val MimeList: List[String] = List("application/pdf", "application/pdf", "application/pdf", + "application/pdf", "application/pdf", "application/pdf", "application/pdf", + "application/pdf") + val SizeList: List[Long] = List(1,2,3,4,5,6,7,8) + + // Can zip 3 lists, but not 4... so we recursively zip + val SampleCdxData : List[Tuple] = ((Sha1Strings, CdxList).zipped.toList, (MimeList, SizeList).zipped.toList) + .zipped + .toList + .map { case ((sha: String, cdx: String), (mime: String, size: Long)) => List(Bytes.toBytes(sha), Bytes.toBytes(cdx), Bytes.toBytes(mime), Bytes.toBytes(size)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + JobTest("sandcrawler.ScoreInsertableJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("crossref-input", input) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) + .source[Tuple](ScoreInsertableJob.getHBaseCdxSource(testTable, testHost), SampleCdxData) + .source(TextLine(input), List( + 0 -> CrossrefStrings(0), + 1 -> CrossrefStrings(1), + 2 -> CrossrefStrings(2), + 3 -> CrossrefStrings(3), + 4 -> CrossrefStrings(4), + 4 -> CrossrefStrings(5))) + .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () } + .sink[(String, String, Int, String, String, String, String, Long)](TypedTsv[(String, String, Int, String, String, String, String, Long)](output)) { + // Grobid titles and slugs (in parentheses): + // Title 1 (title1) + // Title 2: TNG (title2tng) + // Title 3: The Sequel (title3thesequel) + // <too long of a title> + // <too short of a title> + // crossref titles and slugs (in parentheses): + // Title 2: TNG (title2tng) + // Title 1: TNG 2A (title1tng2a) + // Title 1: TNG 3 (title1tng3) + // Title 2: Rebooted (title2rebooted) + // <too long of a title> + // <too short of a title> + // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug + outputBuffer => + "The pipeline" should "return a 1-element list" in { + outputBuffer should have length 1 + } + + it should "has right # of entries with each slug" in { + val slugs = outputBuffer.map(_._2) + val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) + // XXX: countMap("title1") shouldBe 3 + countMap("title2tng") shouldBe 1 + } + + def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = { + val mfg : Option[MapFeatures] = GrobidScorable.jsonToMapFeatures( + Sha1Strings(grobidIndex), + JsonStrings(grobidIndex)) + val mfc : Option[MapFeatures] = CrossrefScorable.jsonToMapFeatures(CrossrefStrings(crossrefIndex)) + if (mfg.isEmpty || mfc.isEmpty) { + fail() + } else { + val score = Scorable.computeSimilarity( + ReduceFeatures(mfg.get.json), + ReduceFeatures(mfc.get.json)) + (slug, score, mfg.get.json, mfc.get.json) + } + } + + it should "have right output values" in { + //outputBuffer.exists(_ == bundle("title1", 0, 0)) + //outputBuffer.exists(_ == bundle("title1", 0, 2)) + //outputBuffer.exists(_ == bundle("title1", 0, 1)) + outputBuffer.exists(_ == bundle("title2tng", 1, 3)) + } + } + .run + .finish +} -- cgit v1.2.3