aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala8
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala216
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala293
3 files changed, 5 insertions, 512 deletions
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 25e5985..bf36855 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -12,9 +12,11 @@ import parallelai.spyglass.hbase.HBaseSource
class GrobidScorable extends Scorable with HBasePipeConversions {
def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = {
// TODO: Clean up code after debugging.
- val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ val grobidSource = HBaseBuilder.build(
args("hbase-table"),
- args("zookeeper-hosts"))
+ args("zookeeper-hosts"),
+ List("grobid0:tei_json"),
+ SourceMode.SCAN_ALL)
// val pipe0 : Pipe = grobidSource.read
// val grobidPipe : TypedPipe[MapFeatures] = pipe0
@@ -26,7 +28,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
.toTypedPipe[(String, String)](new Fields("key", "tei_json"))
.map { entry =>
val (key : String, json : String) = (entry._1, entry._2)
- HBaseCrossrefScore.grobidToSlug(json) match {
+ GrobidScorable.grobidToSlug(json) match {
case Some(slug) => new MapFeatures(slug, json)
case None => new MapFeatures(Scorable.NoSlug, json)
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
deleted file mode 100644
index 2fbb19f..0000000
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-package sandcrawler
-
-import java.text.Normalizer
-import java.util.Arrays
-import java.util.Properties
-import java.util.regex.Pattern
-
-import scala.math
-import scala.util.parsing.json.JSON
-
-import cascading.tuple.Fields
-import com.twitter.scalding._
-import com.twitter.scalding.typed.CoGrouped
-import com.twitter.scalding.typed.Grouped
-import com.twitter.scalding.typed.TDsl._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.util.Bytes
-import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-import parallelai.spyglass.hbase.HBasePipeConversions
-import parallelai.spyglass.hbase.HBaseSource
-
-class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
- val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
-
- // key is SHA1
- val grobidSource = HBaseCrossrefScore.getHBaseSource(
- args("hbase-table"),
- args("zookeeper-hosts"))
-
- val pipe0 : cascading.pipe.Pipe = grobidSource.read
- val grobidPipe : TypedPipe[(String, String, String)] = pipe0
- .fromBytesWritable(new Fields("key", "tei_json"))
- // .debug // Should be 4 tuples for mocked data
- .toTypedPipe[(String, String)]('key, 'tei_json)
- .map { entry =>
- val (key, json) = (entry._1, entry._2)
- // TODO: Consider passing forward only a subset of JSON.
- HBaseCrossrefScore.grobidToSlug(json) match {
- case Some(slug) => (slug, key, json)
- case None => (NoTitle, key, json)
- }
- }
- .filter { entry =>
- val (slug, _, _) = entry
- slug != NoTitle
- }
-// .debug // SHould be 3 tuples for mocked data
-
- val grobidGroup = grobidPipe
- .groupBy { case (slug, key, json) => slug }
-
- val crossrefSource = TextLine(args("crossref-input"))
- val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
- .read
- // .debug // Should be 4 tuples for mocked data
- .toTypedPipe[String]('line)
- .map{ json : String =>
- HBaseCrossrefScore.crossrefToSlug(json) match {
- case Some(slug) => (slug, json)
- case None => (NoTitle, json)
- }
- }
- .filter { entry =>
- val (slug, json) = entry
- slug != NoTitle
- }
-
- val crossrefGroup = crossrefPipe
- .groupBy { case (slug, json) => slug }
-
- val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
- grobidGroup.join(crossrefGroup)
-
- theJoin.map{ entry =>
- val (slug : String,
- ((slug0: String, sha1 : String, grobidJson : String),
- (slug1 : String, crossrefJson : String))) = entry
- HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)}
- // Output: score, sha1, doi, grobid title, crossref title
- .write(TypedTsv[(Int, String, String, String, String)](args("output")))
-}
-
-object HBaseCrossrefScore {
- def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build(
- hbaseTable, // HBase Table Name
- zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?)
- List("grobid0:tei_json"),
- SourceMode.SCAN_ALL)
-
- def jsonToMap(json : String) : Option[Map[String, Any]] = {
- // https://stackoverflow.com/a/32717262/631051
- val jsonObject = JSON.parseFull(json)
- if (jsonObject == None) {
- None
- } else {
- Some(jsonObject.get.asInstanceOf[Map[String, Any]])
- }
- }
-
- def grobidToSlug(json : String) : Option[String] = {
- jsonToMap(json) match {
- case None => None
- case Some(map) => {
- if (map contains "title") {
- titleToSlug(map("title").asInstanceOf[String])
- } else {
- None
- }
- }
- }
- }
-
- def crossrefToSlug(json : String) : Option[String] = {
- jsonToMap(json) match {
- case None => None
- case Some(map) => {
- if (map contains "title") {
- // TODO: Don't ignore titles after the first.
- titleToSlug(map("title").asInstanceOf[List[String]](0))
- } else {
- None
- }
- }
- }
- }
-
- def titleToSlug(title : String) : Option[String] = {
- val slug = removeAccents(title).split(":")(0).toLowerCase()
- if (slug.isEmpty) {
- None
- } else {
- Some(slug)
- }
- }
-
- val MaxScore = 1000
-
- def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) :
- // (score, sha1, doi, grobidTitle, crossrefTitle)
- (Int, String, String, String, String) = {
- jsonToMap(grobidJson) match {
- case None => (0, "", "", "", "") // This can't happen, because grobidJson already validated in earlier stage
- case Some(grobid) => {
- val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase()
-
- jsonToMap(crossrefJson) match {
- case None => (0, "", "", "", "") // This can't happen, because crossrefJson already validated in earlier stage
- case Some(crossref) => {
- val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase()
-
- (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)),
- sha1,
- crossref("DOI").asInstanceOf[String],
- "'" + grobidTitle + "'",
- "'" + crossrefTitle + "'")
- }
- }
- }
- }
- }
-
- // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934
- def removeAccents(s : String) : String = {
- val replacements = Map(
- '\u0141' -> 'L',
- '\u0142' -> 'l', // Letter ell
- '\u00d8' -> 'O',
- '\u00f8' -> 'o'
- )
- val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD))
- for (i <- 0 to sb.length - 1) {
- for (key <- replacements.keys) {
- if (sb(i) == key) {
- sb.deleteCharAt(i);
- sb.insert(i, replacements(key))
- }
- }
- }
- val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+")
- pattern.matcher(sb).replaceAll("")
- }
-
- // Adapted from: https://stackoverflow.com/a/16018452/631051
- def similarity(s1 : String, s2 : String) : Int = {
- val longer : String = if (s1.length > s2.length) s1 else s2
- val shorter : String = if (s1.length > s2.length) s2 else s1
- if (longer.length == 0) {
- // Both strings are empty.
- MaxScore
- } else {
- (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length
- }
- }
-
- // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
- def stringDistance(s1: String, s2: String): Int = {
- val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]()
- def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
- def sd(s1: List[Char], s2: List[Char]): Int = {
- if (!memo.contains((s1, s2))) {
- memo((s1,s2)) = (s1, s2) match {
- case (_, Nil) => s1.length
- case (Nil, _) => s2.length
- case (c1::t1, c2::t2) =>
- min( sd(t1,s2) + 1, sd(s1,t2) + 1,
- sd(t1,t2) + (if (c1==c2) 0 else 1) )
- }
- }
- memo((s1,s2))
- }
-
- sd( s1.toList, s2.toList )
- }
-}
-
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
deleted file mode 100644
index ebe7dc0..0000000
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ /dev/null
@@ -1,293 +0,0 @@
-package sandcrawler
-
-import cascading.tuple.Fields
-import cascading.tuple.Tuple
-import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.util.Bytes
-import org.scalatest._
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-
-class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
-/*
- val GrobidString = """
-{
- "title": "<<TITLE>>",
- "authors": [
- {"name": "Brewster Kahle"},
- {"name": "J Doe"}
- ],
- "journal": {
- "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
- "eissn": null,
- "issn": null,
- "issue": null,
- "publisher": null,
- "volume": null
- },
- "date": "2000",
- "doi": null,
- "citations": [
- { "authors": [{"name": "A Seaperson"}],
- "date": "2001",
- "id": "b0",
- "index": 0,
- "issue": null,
- "journal": "Letters in the Alphabet",
- "publisher": null,
- "title": "Everything is Wonderful",
- "url": null,
- "volume": "20"},
- { "authors": [],
- "date": "2011-03-28",
- "id": "b1",
- "index": 1,
- "issue": null,
- "journal": "The Dictionary",
- "publisher": null,
- "title": "All about Facts",
- "url": null,
- "volume": "14"}
- ],
- "abstract": "Everything you ever wanted to know about nothing",
- "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
- "acknowledgement": null,
- "annex": null
-}
-"""
- val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File")
- val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle")
- val MalformedGrobidString = GrobidString.replace("}", "")
-
- val CrossrefString =
-"""
-{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" },
- "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ],
- "date-time" : "2017-10-23T17:19:16Z",
- "timestamp" : { "$numberLong" : "1508779156477" } },
- "reference-count" : 0,
- "publisher" : "Elsevier BV",
- "issue" : "3",
- "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/",
- "start" : { "date-parts" : [ [ 1996, 1, 1 ] ],
- "date-time" : "1996-01-01T00:00:00Z",
- "timestamp" : { "$numberLong" : "820454400000" } },
- "delay-in-days" : 0, "content-version" : "tdm" }],
- "content-domain" : { "domain" : [], "crossmark-restriction" : false },
- "published-print" : { "date-parts" : [ [ 1996 ] ] },
- "DOI" : "<<DOI>>",
- "type" : "journal-article",
- "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],
- "date-time" : "2002-07-25T15:09:41Z",
- "timestamp" : { "$numberLong" : "1027609781000" } },
- "page" : "186-187",
- "source" : "Crossref",
- "is-referenced-by-count" : 0,
- "title" : [ "<<TITLE>>" ],
- "prefix" : "10.1016",
- "volume" : "9",
- "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],
- "member" : "78",
- "container-title" : [ "Journal de Pédiatrie et de Puériculture" ],
- "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml",
- "content-type" : "text/xml",
- "content-version" : "vor",
- "intended-application" : "text-mining" },
- { "URL" :
- "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain",
- "content-type" : "text/plain",
- "content-version" : "vor",
- "intended-application" : "text-mining" } ],
- "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ],
- "date-time" : "2015-09-03T10:03:43Z",
- "timestamp" : { "$numberLong" : "1441274623000" } },
- "score" : 1,
- "issued" : { "date-parts" : [ [ 1996 ] ] },
- "references-count" : 0,
- "alternative-id" : [ "0987-7983(96)87729-2" ],
- "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2",
- "ISSN" : [ "0987-7983" ],
- "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ],
- "subject" : [ "Pediatrics, Perinatology, and Child Health" ]
-}
-"""
- val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")
- val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
- val MalformedCrossrefString = CrossrefString.replace("}", "")
-
- // Unit tests
-
- "titleToSlug()" should "extract the parts of titles before a colon" in {
- val slug = HBaseCrossrefScore.titleToSlug("HELLO:there")
- slug should contain ("hello")
- }
-
- it should "extract an entire colon-less string" in {
- val slug = HBaseCrossrefScore.titleToSlug("hello THERE")
- slug should contain ("hello there")
- }
-
- it should "return None if given empty string" in {
- HBaseCrossrefScore.titleToSlug("") shouldBe None
- }
-
- "grobidToSlug()" should "get the right slug for a grobid json string" in {
- val slug = HBaseCrossrefScore.grobidToSlug(GrobidStringWithTitle)
- slug should contain ("dummy example file")
- }
-
- it should "return None if given json string without title" in {
- val slug = HBaseCrossrefScore.grobidToSlug(GrobidStringWithoutTitle)
- slug shouldBe None
- }
-
- it should "return None if given a malformed json string" in {
- val slug = HBaseCrossrefScore.grobidToSlug(MalformedGrobidString)
- slug shouldBe None
- }
-
- it should "return None if given an empty json string" in {
- val slug = HBaseCrossrefScore.grobidToSlug("")
- slug shouldBe None
- }
-
- "crossrefToSlug()" should "get the right slug for a crossref json string" in {
- val slug = HBaseCrossrefScore.crossrefToSlug(CrossrefStringWithTitle)
- slug should contain ("sometitle")
- }
-
- it should "return None if given json string without title" in {
- val slug = HBaseCrossrefScore.grobidToSlug(CrossrefStringWithoutTitle)
- slug shouldBe None
- }
-
- it should "return None if given a malformed json string" in {
- val slug = HBaseCrossrefScore.grobidToSlug(MalformedCrossrefString)
- slug shouldBe None
- }
-
- "removeAccents()" should "handle the empty string" in {
- HBaseCrossrefScore.removeAccents("") shouldBe ""
- }
-
- it should "not change a string with unaccented characters" in {
- HBaseCrossrefScore.removeAccents("abc123") shouldBe "abc123"
- }
-
- it should "remove accents from Ls" in {
- HBaseCrossrefScore.removeAccents("E\u0141\u0142en") shouldBe "ELlen"
- }
-
- it should "remove accents from Es without changing case" in {
- val result = HBaseCrossrefScore.removeAccents("\u00e9")
- result should have length 1
- result shouldBe "e"
- }
-
- it should "convert the ø in Soren" in {
- HBaseCrossrefScore.removeAccents("Søren") shouldBe "Soren"
- HBaseCrossrefScore.removeAccents("SØREN") shouldBe "SOREN"
- }
-
- // Tests adapted from https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
- "stringDistance" should "work on empty strings" in {
- HBaseCrossrefScore.stringDistance("", "") shouldBe 0
- HBaseCrossrefScore.stringDistance("a", "") shouldBe 1
- HBaseCrossrefScore.stringDistance("", "a") shouldBe 1
- HBaseCrossrefScore.stringDistance("abc", "") shouldBe 3
- HBaseCrossrefScore.stringDistance("", "abc") shouldBe 3
- }
-
- it should "work on equal strings" in {
- HBaseCrossrefScore.stringDistance("", "") shouldBe 0
- HBaseCrossrefScore.stringDistance("a", "a") shouldBe 0
- HBaseCrossrefScore.stringDistance("abc", "abc") shouldBe 0
- }
-
- it should "work where only inserts are needed" in {
- HBaseCrossrefScore.stringDistance("", "a") shouldBe 1
- HBaseCrossrefScore.stringDistance("a", "ab") shouldBe 1
- HBaseCrossrefScore.stringDistance("b", "ab") shouldBe 1
- HBaseCrossrefScore.stringDistance("ac", "abc") shouldBe 1
- HBaseCrossrefScore.stringDistance("abcdefg", "xabxcdxxefxgx") shouldBe 6
- }
-
- it should "work where only deletes are needed" in {
- HBaseCrossrefScore.stringDistance( "a", "") shouldBe 1
- HBaseCrossrefScore.stringDistance( "ab", "a") shouldBe 1
- HBaseCrossrefScore.stringDistance( "ab", "b") shouldBe 1
- HBaseCrossrefScore.stringDistance("abc", "ac") shouldBe 1
- HBaseCrossrefScore.stringDistance("xabxcdxxefxgx", "abcdefg") shouldBe 6
- }
-
- it should "work where only substitutions are needed" in {
- HBaseCrossrefScore.stringDistance( "a", "b") shouldBe 1
- HBaseCrossrefScore.stringDistance( "ab", "ac") shouldBe 1
- HBaseCrossrefScore.stringDistance( "ac", "bc") shouldBe 1
- HBaseCrossrefScore.stringDistance("abc", "axc") shouldBe 1
- HBaseCrossrefScore.stringDistance("xabxcdxxefxgx", "1ab2cd34ef5g6") shouldBe 6
- }
-
- it should "work where many operations are needed" in {
- HBaseCrossrefScore.stringDistance("example", "samples") shouldBe 3
- HBaseCrossrefScore.stringDistance("sturgeon", "urgently") shouldBe 6
- HBaseCrossrefScore.stringDistance("levenshtein", "frankenstein") shouldBe 6
- HBaseCrossrefScore.stringDistance("distance", "difference") shouldBe 5
- HBaseCrossrefScore.stringDistance("java was neat", "scala is great") shouldBe 7
- }
-
- // Pipeline tests
- val output = "/tmp/testOutput"
- val input = "/tmp/testInput"
- val (testTable, testHost) = ("test-table", "dummy-host:2181")
-
- val grobidSampleData = List(
- List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"),
- Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))),
- List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"),
- Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))),
- List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"),
- Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))),
- List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"),
- Bytes.toBytes(MalformedGrobidString)))
-
- JobTest("sandcrawler.HBaseCrossrefScoreJob")
- .arg("test", "")
- .arg("app.conf.path", "app.conf")
- .arg("output", output)
- .arg("hbase-table", testTable)
- .arg("zookeeper-hosts", testHost)
- .arg("crossref-input", input)
- .arg("debug", "true")
- .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost),
- grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .source(TextLine(input), List(
- 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
- 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"),
- 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"),
- 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
- .sink[(Int, String, String, String, String)](TypedTsv[(Int,
- String, String, String, String)](output)) {
- // Grobid titles:
- // "Title 1", "Title 2: TNG", "Title 3: The Sequel"
- // crossref slugs:
- // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted"
- // Join should have 3 "Title 1" slugs and 1 "Title 2" slug
- outputBuffer =>
- "The pipeline" should "return a 4-element list" in {
- outputBuffer should have length 4
- }
-
- it should "return the right first entry" in {
- val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0)
- slug shouldBe "title 1"
- slug shouldBe slug0
- slug shouldBe slug1
- sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8")
- grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8")
- }
- }
- .run
- .finish
- */
-}