diff options
Diffstat (limited to 'scalding/src')
3 files changed, 5 insertions, 512 deletions
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 25e5985..bf36855 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -12,9 +12,11 @@ import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { // TODO: Clean up code after debugging. - val grobidSource = HBaseCrossrefScore.getHBaseSource( + val grobidSource = HBaseBuilder.build( args("hbase-table"), - args("zookeeper-hosts")) + args("zookeeper-hosts"), + List("grobid0:tei_json"), + SourceMode.SCAN_ALL) // val pipe0 : Pipe = grobidSource.read // val grobidPipe : TypedPipe[MapFeatures] = pipe0 @@ -26,7 +28,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions { .toTypedPipe[(String, String)](new Fields("key", "tei_json")) .map { entry => val (key : String, json : String) = (entry._1, entry._2) - HBaseCrossrefScore.grobidToSlug(json) match { + GrobidScorable.grobidToSlug(json) match { case Some(slug) => new MapFeatures(slug, json) case None => new MapFeatures(Scorable.NoSlug, json) } diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala deleted file mode 100644 index 2fbb19f..0000000 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ /dev/null @@ -1,216 +0,0 @@ -package sandcrawler - -import java.text.Normalizer -import java.util.Arrays -import java.util.Properties -import java.util.regex.Pattern - -import scala.math -import scala.util.parsing.json.JSON - -import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.CoGrouped -import com.twitter.scalding.typed.Grouped -import com.twitter.scalding.typed.TDsl._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource - -class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable - - // key is SHA1 - val grobidSource = HBaseCrossrefScore.getHBaseSource( - args("hbase-table"), - args("zookeeper-hosts")) - - val pipe0 : cascading.pipe.Pipe = grobidSource.read - val grobidPipe : TypedPipe[(String, String, String)] = pipe0 - .fromBytesWritable(new Fields("key", "tei_json")) - // .debug // Should be 4 tuples for mocked data - .toTypedPipe[(String, String)]('key, 'tei_json) - .map { entry => - val (key, json) = (entry._1, entry._2) - // TODO: Consider passing forward only a subset of JSON. - HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => (slug, key, json) - case None => (NoTitle, key, json) - } - } - .filter { entry => - val (slug, _, _) = entry - slug != NoTitle - } -// .debug // SHould be 3 tuples for mocked data - - val grobidGroup = grobidPipe - .groupBy { case (slug, key, json) => slug } - - val crossrefSource = TextLine(args("crossref-input")) - val crossrefPipe : TypedPipe[(String, String)] = crossrefSource - .read - // .debug // Should be 4 tuples for mocked data - .toTypedPipe[String]('line) - .map{ json : String => - HBaseCrossrefScore.crossrefToSlug(json) match { - case Some(slug) => (slug, json) - case None => (NoTitle, json) - } - } - .filter { entry => - val (slug, json) = entry - slug != NoTitle - } - - val crossrefGroup = crossrefPipe - .groupBy { case (slug, json) => slug } - - val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = - grobidGroup.join(crossrefGroup) - - theJoin.map{ entry => - val (slug : String, - ((slug0: String, sha1 : String, grobidJson : String), - (slug1 : String, crossrefJson : String))) = entry - HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} - // Output: score, sha1, doi, grobid title, crossref title - .write(TypedTsv[(Int, String, String, String, String)](args("output"))) -} - -object HBaseCrossrefScore { - def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( - hbaseTable, // HBase Table Name - zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?) - List("grobid0:tei_json"), - SourceMode.SCAN_ALL) - - def jsonToMap(json : String) : Option[Map[String, Any]] = { - // https://stackoverflow.com/a/32717262/631051 - val jsonObject = JSON.parseFull(json) - if (jsonObject == None) { - None - } else { - Some(jsonObject.get.asInstanceOf[Map[String, Any]]) - } - } - - def grobidToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - titleToSlug(map("title").asInstanceOf[String]) - } else { - None - } - } - } - } - - def crossrefToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - // TODO: Don't ignore titles after the first. - titleToSlug(map("title").asInstanceOf[List[String]](0)) - } else { - None - } - } - } - } - - def titleToSlug(title : String) : Option[String] = { - val slug = removeAccents(title).split(":")(0).toLowerCase() - if (slug.isEmpty) { - None - } else { - Some(slug) - } - } - - val MaxScore = 1000 - - def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : - // (score, sha1, doi, grobidTitle, crossrefTitle) - (Int, String, String, String, String) = { - jsonToMap(grobidJson) match { - case None => (0, "", "", "", "") // This can't happen, because grobidJson already validated in earlier stage - case Some(grobid) => { - val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() - - jsonToMap(crossrefJson) match { - case None => (0, "", "", "", "") // This can't happen, because crossrefJson already validated in earlier stage - case Some(crossref) => { - val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() - - (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)), - sha1, - crossref("DOI").asInstanceOf[String], - "'" + grobidTitle + "'", - "'" + crossrefTitle + "'") - } - } - } - } - } - - // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 - def removeAccents(s : String) : String = { - val replacements = Map( - '\u0141' -> 'L', - '\u0142' -> 'l', // Letter ell - '\u00d8' -> 'O', - '\u00f8' -> 'o' - ) - val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) - for (i <- 0 to sb.length - 1) { - for (key <- replacements.keys) { - if (sb(i) == key) { - sb.deleteCharAt(i); - sb.insert(i, replacements(key)) - } - } - } - val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") - pattern.matcher(sb).replaceAll("") - } - - // Adapted from: https://stackoverflow.com/a/16018452/631051 - def similarity(s1 : String, s2 : String) : Int = { - val longer : String = if (s1.length > s2.length) s1 else s2 - val shorter : String = if (s1.length > s2.length) s2 else s1 - if (longer.length == 0) { - // Both strings are empty. - MaxScore - } else { - (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length - } - } - - // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ - def stringDistance(s1: String, s2: String): Int = { - val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() - def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) - def sd(s1: List[Char], s2: List[Char]): Int = { - if (!memo.contains((s1, s2))) { - memo((s1,s2)) = (s1, s2) match { - case (_, Nil) => s1.length - case (Nil, _) => s2.length - case (c1::t1, c2::t2) => - min( sd(t1,s2) + 1, sd(s1,t2) + 1, - sd(t1,t2) + (if (c1==c2) 0 else 1) ) - } - } - memo((s1,s2)) - } - - sd( s1.toList, s2.toList ) - } -} - diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala deleted file mode 100644 index ebe7dc0..0000000 --- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala +++ /dev/null @@ -1,293 +0,0 @@ -package sandcrawler - -import cascading.tuple.Fields -import cascading.tuple.Tuple -import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import org.scalatest._ -import parallelai.spyglass.hbase.HBaseConstants.SourceMode - -class HBaseCrossrefScoreTest extends FlatSpec with Matchers { -/* - val GrobidString = """ -{ - "title": "<<TITLE>>", - "authors": [ - {"name": "Brewster Kahle"}, - {"name": "J Doe"} - ], - "journal": { - "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", - "eissn": null, - "issn": null, - "issue": null, - "publisher": null, - "volume": null - }, - "date": "2000", - "doi": null, - "citations": [ - { "authors": [{"name": "A Seaperson"}], - "date": "2001", - "id": "b0", - "index": 0, - "issue": null, - "journal": "Letters in the Alphabet", - "publisher": null, - "title": "Everything is Wonderful", - "url": null, - "volume": "20"}, - { "authors": [], - "date": "2011-03-28", - "id": "b1", - "index": 1, - "issue": null, - "journal": "The Dictionary", - "publisher": null, - "title": "All about Facts", - "url": null, - "volume": "14"} - ], - "abstract": "Everything you ever wanted to know about nothing", - "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", - "acknowledgement": null, - "annex": null -} -""" - val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") - val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") - val MalformedGrobidString = GrobidString.replace("}", "") - - val CrossrefString = -""" -{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, - "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], - "date-time" : "2017-10-23T17:19:16Z", - "timestamp" : { "$numberLong" : "1508779156477" } }, - "reference-count" : 0, - "publisher" : "Elsevier BV", - "issue" : "3", - "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", - "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], - "date-time" : "1996-01-01T00:00:00Z", - "timestamp" : { "$numberLong" : "820454400000" } }, - "delay-in-days" : 0, "content-version" : "tdm" }], - "content-domain" : { "domain" : [], "crossmark-restriction" : false }, - "published-print" : { "date-parts" : [ [ 1996 ] ] }, - "DOI" : "<<DOI>>", - "type" : "journal-article", - "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], - "date-time" : "2002-07-25T15:09:41Z", - "timestamp" : { "$numberLong" : "1027609781000" } }, - "page" : "186-187", - "source" : "Crossref", - "is-referenced-by-count" : 0, - "title" : [ "<<TITLE>>" ], - "prefix" : "10.1016", - "volume" : "9", - "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], - "member" : "78", - "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], - "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", - "content-type" : "text/xml", - "content-version" : "vor", - "intended-application" : "text-mining" }, - { "URL" : - "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", - "content-type" : "text/plain", - "content-version" : "vor", - "intended-application" : "text-mining" } ], - "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], - "date-time" : "2015-09-03T10:03:43Z", - "timestamp" : { "$numberLong" : "1441274623000" } }, - "score" : 1, - "issued" : { "date-parts" : [ [ 1996 ] ] }, - "references-count" : 0, - "alternative-id" : [ "0987-7983(96)87729-2" ], - "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", - "ISSN" : [ "0987-7983" ], - "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], - "subject" : [ "Pediatrics, Perinatology, and Child Health" ] -} -""" - val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") - val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") - val MalformedCrossrefString = CrossrefString.replace("}", "") - - // Unit tests - - "titleToSlug()" should "extract the parts of titles before a colon" in { - val slug = HBaseCrossrefScore.titleToSlug("HELLO:there") - slug should contain ("hello") - } - - it should "extract an entire colon-less string" in { - val slug = HBaseCrossrefScore.titleToSlug("hello THERE") - slug should contain ("hello there") - } - - it should "return None if given empty string" in { - HBaseCrossrefScore.titleToSlug("") shouldBe None - } - - "grobidToSlug()" should "get the right slug for a grobid json string" in { - val slug = HBaseCrossrefScore.grobidToSlug(GrobidStringWithTitle) - slug should contain ("dummy example file") - } - - it should "return None if given json string without title" in { - val slug = HBaseCrossrefScore.grobidToSlug(GrobidStringWithoutTitle) - slug shouldBe None - } - - it should "return None if given a malformed json string" in { - val slug = HBaseCrossrefScore.grobidToSlug(MalformedGrobidString) - slug shouldBe None - } - - it should "return None if given an empty json string" in { - val slug = HBaseCrossrefScore.grobidToSlug("") - slug shouldBe None - } - - "crossrefToSlug()" should "get the right slug for a crossref json string" in { - val slug = HBaseCrossrefScore.crossrefToSlug(CrossrefStringWithTitle) - slug should contain ("sometitle") - } - - it should "return None if given json string without title" in { - val slug = HBaseCrossrefScore.grobidToSlug(CrossrefStringWithoutTitle) - slug shouldBe None - } - - it should "return None if given a malformed json string" in { - val slug = HBaseCrossrefScore.grobidToSlug(MalformedCrossrefString) - slug shouldBe None - } - - "removeAccents()" should "handle the empty string" in { - HBaseCrossrefScore.removeAccents("") shouldBe "" - } - - it should "not change a string with unaccented characters" in { - HBaseCrossrefScore.removeAccents("abc123") shouldBe "abc123" - } - - it should "remove accents from Ls" in { - HBaseCrossrefScore.removeAccents("E\u0141\u0142en") shouldBe "ELlen" - } - - it should "remove accents from Es without changing case" in { - val result = HBaseCrossrefScore.removeAccents("\u00e9") - result should have length 1 - result shouldBe "e" - } - - it should "convert the ø in Soren" in { - HBaseCrossrefScore.removeAccents("Søren") shouldBe "Soren" - HBaseCrossrefScore.removeAccents("SØREN") shouldBe "SOREN" - } - - // Tests adapted from https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ - "stringDistance" should "work on empty strings" in { - HBaseCrossrefScore.stringDistance("", "") shouldBe 0 - HBaseCrossrefScore.stringDistance("a", "") shouldBe 1 - HBaseCrossrefScore.stringDistance("", "a") shouldBe 1 - HBaseCrossrefScore.stringDistance("abc", "") shouldBe 3 - HBaseCrossrefScore.stringDistance("", "abc") shouldBe 3 - } - - it should "work on equal strings" in { - HBaseCrossrefScore.stringDistance("", "") shouldBe 0 - HBaseCrossrefScore.stringDistance("a", "a") shouldBe 0 - HBaseCrossrefScore.stringDistance("abc", "abc") shouldBe 0 - } - - it should "work where only inserts are needed" in { - HBaseCrossrefScore.stringDistance("", "a") shouldBe 1 - HBaseCrossrefScore.stringDistance("a", "ab") shouldBe 1 - HBaseCrossrefScore.stringDistance("b", "ab") shouldBe 1 - HBaseCrossrefScore.stringDistance("ac", "abc") shouldBe 1 - HBaseCrossrefScore.stringDistance("abcdefg", "xabxcdxxefxgx") shouldBe 6 - } - - it should "work where only deletes are needed" in { - HBaseCrossrefScore.stringDistance( "a", "") shouldBe 1 - HBaseCrossrefScore.stringDistance( "ab", "a") shouldBe 1 - HBaseCrossrefScore.stringDistance( "ab", "b") shouldBe 1 - HBaseCrossrefScore.stringDistance("abc", "ac") shouldBe 1 - HBaseCrossrefScore.stringDistance("xabxcdxxefxgx", "abcdefg") shouldBe 6 - } - - it should "work where only substitutions are needed" in { - HBaseCrossrefScore.stringDistance( "a", "b") shouldBe 1 - HBaseCrossrefScore.stringDistance( "ab", "ac") shouldBe 1 - HBaseCrossrefScore.stringDistance( "ac", "bc") shouldBe 1 - HBaseCrossrefScore.stringDistance("abc", "axc") shouldBe 1 - HBaseCrossrefScore.stringDistance("xabxcdxxefxgx", "1ab2cd34ef5g6") shouldBe 6 - } - - it should "work where many operations are needed" in { - HBaseCrossrefScore.stringDistance("example", "samples") shouldBe 3 - HBaseCrossrefScore.stringDistance("sturgeon", "urgently") shouldBe 6 - HBaseCrossrefScore.stringDistance("levenshtein", "frankenstein") shouldBe 6 - HBaseCrossrefScore.stringDistance("distance", "difference") shouldBe 5 - HBaseCrossrefScore.stringDistance("java was neat", "scala is great") shouldBe 7 - } - - // Pipeline tests - val output = "/tmp/testOutput" - val input = "/tmp/testInput" - val (testTable, testHost) = ("test-table", "dummy-host:2181") - - val grobidSampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), - Bytes.toBytes(MalformedGrobidString))) - - JobTest("sandcrawler.HBaseCrossrefScoreJob") - .arg("test", "") - .arg("app.conf.path", "app.conf") - .arg("output", output) - .arg("hbase-table", testTable) - .arg("zookeeper-hosts", testHost) - .arg("crossref-input", input) - .arg("debug", "true") - .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), - grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .source(TextLine(input), List( - 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), - 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), - 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) - .sink[(Int, String, String, String, String)](TypedTsv[(Int, - String, String, String, String)](output)) { - // Grobid titles: - // "Title 1", "Title 2: TNG", "Title 3: The Sequel" - // crossref slugs: - // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" - // Join should have 3 "Title 1" slugs and 1 "Title 2" slug - outputBuffer => - "The pipeline" should "return a 4-element list" in { - outputBuffer should have length 4 - } - - it should "return the right first entry" in { - val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) - slug shouldBe "title 1" - slug shouldBe slug0 - slug shouldBe slug1 - sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") - grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") - } - } - .run - .finish - */ -} |