diff options
Diffstat (limited to 'scalding/src')
16 files changed, 1010 insertions, 12 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala new file mode 100644 index 0000000..ff8201a --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -0,0 +1,47 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBasePipeConversions + +class CrossrefScorable extends Scorable with HBasePipeConversions { + // TODO: Generalize args so there can be multiple Crossref pipes in one job. + def getSource(args : Args) : Source = { + TextLine(args("crossref-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .map { CrossrefScorable.jsonToMapFeatures(_) } + } +} + +object CrossrefScorable { + def jsonToMapFeatures(json : String) : MapFeatures = { + Scorable.jsonToMap(json) match { + case None => MapFeatures(Scorable.NoSlug, json) + case Some(map) => { + if ((map contains "title") && (map contains "DOI")) { + val titles = map("title").asInstanceOf[List[String]] + val doi = Scorable.getString(map, "DOI") + if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) { + new MapFeatures(Scorable.NoSlug, json) + } else { + // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ] + val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi) + new MapFeatures(sf.toSlug, sf.toString) + } + } else { + new MapFeatures(Scorable.NoSlug, json) + } + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala new file mode 100644 index 0000000..9a09e05 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -0,0 +1,56 @@ +package sandcrawler + +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorable extends Scorable with HBasePipeConversions { + val StatusOK = 200 + + def getSource(args : Args) : Source = { + // TODO: Generalize args so there can be multiple grobid pipes in one job. + GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + getSource(args) + .read + // Can't just "fromBytesWritable" because we have multiple types? + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code")) + .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null } + .map { case (key, tei_json, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes())) + } + // TODO: Should I combine next two stages for efficiency? + .collect { case (key, json, StatusOK) => (key, json) } + .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } + } +} + +object GrobidScorable { + def getHBaseSource(table : String, host : String) : HBaseSource = { + HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL) + } + + def jsonToMapFeatures(key : String, json : String) : MapFeatures = { + Scorable.jsonToMap(json) match { + case None => MapFeatures(Scorable.NoSlug, json) + case Some(map) => { + if (map contains "title") { + new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures + } else { + MapFeatures(Scorable.NoSlug, json) + } + } + } + } +} + diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala new file mode 100644 index 0000000..9b9c633 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -0,0 +1,79 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +case class MapFeatures(slug : String, json : String) +case class ReduceFeatures(json : String) +case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) + +abstract class Scorable { + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = + { + getFeaturesPipe(args) + .filter { entry => Scorable.isValidSlug(entry.slug) } + .groupBy { case MapFeatures(slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + } + + // abstract methods + def getSource(args : Args) : Source + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] +} + +object Scorable { + val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable + + def isValidSlug(slug : String) : Boolean = { + slug != NoSlug + } + + def jsonToMap(json : String) : Option[Map[String, Any]] = { + // https://stackoverflow.com/a/32717262/631051 + val jsonObject = JSON.parseFull(json) + if (jsonObject == None) { + None + } else { + Some(jsonObject.get.asInstanceOf[Map[String, Any]]) + } + } + + def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = { + optionalMap match { + case None => None + case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None + } + } + + // Caller is responsible for ensuring that key is a String in map. + // TODO: Add and handle ClassCastException + def getString(map : Map[String, Any], key : String) : String = { + assert(map contains key) + map(key).asInstanceOf[String] + } + + val MaxScore = 1000 + + def computeSimilarity(features1 : ReduceFeatures, features2 : ReduceFeatures) : Int = { + val json1 = jsonToMap(features1.json) + val json2 = jsonToMap(features2.json) + getStringOption(json1, "title") match { + case None => 0 + case Some(title1) => { + getStringOption(json2, "title") match { + case None => 0 + case Some(title2) => + (StringUtilities.similarity(title1, title2) * MaxScore).toInt + } + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala new file mode 100644 index 0000000..8ed3369 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -0,0 +1,44 @@ +package sandcrawler + +import scala.util.parsing.json.JSONObject + + +// Contains features needed to make slug and to score (in combination +// with a second ScorableFeatures). +class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") { + + val slugBlacklist = Set( "abbreviations", "abstract", "acknowledgements", + "article", "authorreply", "authorsreply", "bookreview", "bookreviews", + "casereport", "commentary", "commentaryon", "commenton", "commentto", + "contents", "correspondence", "dedication", "editorialadvisoryboard", + "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue", + "lettertotheeditor", "listofabbreviations", "note", "overview", "preface", + "references", "results", "review", "reviewarticle", "summary", "title", + "name") + + def toMap() : Map[String, Any] = { + Map("title" -> (if (title == null) "" else title), + "year" -> year, + "doi" -> (if (doi == null) "" else doi), + "sha1" -> (if (sha1 == null) "" else sha1)) + } + + override def toString() : String = { + JSONObject(toMap()).toString + } + + def toSlug() : String = { + if (title == null) { + Scorable.NoSlug + } else { + val unaccented = StringUtilities.removeAccents(title) + // Remove punctuation + val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") + if (slug.isEmpty || slug == null || (slugBlacklist contains slug)) Scorable.NoSlug else slug + } + } + + def toMapFeatures = { + MapFeatures(toSlug, toString) + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala new file mode 100644 index 0000000..75d45e9 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -0,0 +1,57 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class ScoreJob(args: Args) extends JobBase(args) { + // TODO: Instantiate any subclass of Scorable specified in args. + val sc1 : Scorable = new GrobidScorable() + val sc2 : Scorable = new CrossrefScorable() + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) + + pipe1.join(pipe2).map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), + features1.json, + features2.json) + } + //TypedTsv doesn't work over case classes. + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} + +/* +// Ugly hack to get non-String information into ScoreJob above. +object ScoreJob { + var scorable1 : Option[Scorable] = None + var scorable2 : Option[Scorable] = None + + def setScorable1(s : Scorable) { + scorable1 = Some(s) + } + + def getScorable1() : Scorable = { + scorable1 match { + case Some(s) => s + case None => null + } + } + + def setScorable2(s: Scorable) { + scorable2 = Some(s) + } + + def getScorable2() : Scorable = { + scorable2 match { + case Some(s) => s + case None => null + } + } +} + */ diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala new file mode 100644 index 0000000..2745875 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -0,0 +1,76 @@ +package sandcrawler + +import java.text.Normalizer +import java.util.regex.Pattern + +object StringUtilities { + // bnewbold: I propose that we: + // 1. keep only \p{Ideographic}, \p{Alphabetic}, and \p{Digit} + // 2. strip accents + // 3. "lower-case" (unicode-aware) + // 4. do any final custom/manual mappings + // + // We should check (test) that null bytes are handled, in addition to other + // more obvious characters + + // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 + def removeAccents(s : String) : String = { + val replacements = Map( + '\u0141' -> 'L', + '\u0142' -> 'l', // Letter ell + '\u00d8' -> 'O', + '\u00f8' -> 'o' + ) + val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) + for (i <- 0 to sb.length - 1) { + for (key <- replacements.keys) { + if (sb(i) == key) { + sb.deleteCharAt(i); + sb.insert(i, replacements(key)) + } + } + } + val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") + pattern.matcher(sb).replaceAll("") + } + + // Source: https://stackoverflow.com/a/30076541/631051 + def removePunctuation(s: String) : String = { + s.replaceAll("""[\p{Punct}]""", "") + } + + // Adapted from: https://stackoverflow.com/a/16018452/631051 + def similarity(s1a : String, s2a : String) : Double = { + val (s1, s2) = (removeAccents(removePunctuation(s1a)), + removeAccents(removePunctuation(s2a))) + val longer : String = if (s1.length > s2.length) s1 else s2 + val shorter : String = if (s1.length > s2.length) s2 else s1 + if (longer.length == 0) { + // Both strings are empty. + 1 + } else { + (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble + } + } + + // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ + def stringDistance(s1: String, s2: String): Int = { + val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() + def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) + def sd(s1: List[Char], s2: List[Char]): Int = { + if (!memo.contains((s1, s2))) { + memo((s1,s2)) = (s1, s2) match { + case (_, Nil) => s1.length + case (Nil, _) => s2.length + case (c1::t1, c2::t2) => + min( sd(t1,s2) + 1, sd(s1,t2) + 1, + sd(t1,t2) + (if (c1==c2) 0 else 1) ) + } + } + memo((s1,s2)) + } + + sd( s1.toList, s2.toList ) + } +} + diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala new file mode 100644 index 0000000..1789d1a --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -0,0 +1,100 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class CrossrefScorableTest extends FlatSpec with Matchers { + // scalastyle:off + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + // scalastyle:on + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "Some Title") + val CrossrefStringWithEmptyTitle = CrossrefString.replace("<<TITLE>>", "") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + + // Unit tests + "CrossrefScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + val result = CrossrefScorable.jsonToMapFeatures(MalformedCrossrefString) + result.slug shouldBe Scorable.NoSlug + } + + it should "handle missing title" in { + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithoutTitle) + result.slug shouldBe Scorable.NoSlug + } + + it should "handle empty title" in { + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithEmptyTitle) + result.slug shouldBe Scorable.NoSlug + } + + it should "handle valid input" in { + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithTitle) + result.slug shouldBe "sometitle" + Scorable.jsonToMap(result.json) match { + case None => fail() + case Some(map) => { + map("title").asInstanceOf[String] shouldBe "Some Title" + } + } + } +} diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala new file mode 100644 index 0000000..661824b --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala @@ -0,0 +1,88 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class GrobidScorableTest extends FlatSpec with Matchers { + val GrobidString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") + val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") + val MalformedGrobidString = GrobidString.replace("}", "") + val Key = "Dummy Key" + + // Unit tests + + "GrobidScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + val result = GrobidScorable.jsonToMapFeatures(Key, MalformedGrobidString) + result.slug shouldBe Scorable.NoSlug + } + + it should "handle missing title" in { + val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithoutTitle) + result.slug shouldBe Scorable.NoSlug + } + + it should "handle valid input" in { + val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithTitle) + result.slug shouldBe "dummyexamplefile" + Scorable.jsonToMap(result.json) match { + case None => fail() + case Some(map) => { + map should contain key "title" + map("title").asInstanceOf[String] shouldBe "Dummy Example File" + } + } + } +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala index 603a4c7..c61cb22 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala @@ -22,6 +22,7 @@ class HBaseBuilderTest extends FlatSpec with Matchers { fields should have length 0 } + //scalastyle:off no.whitespace.before.left.bracket it should "throw IllegalArgumentException on malformed input" in { a [IllegalArgumentException] should be thrownBy { HBaseBuilder.parseColSpecs(List("file_size")) diff --git a/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala index fde2290..d6d283f 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala @@ -1,15 +1,18 @@ package sandcrawler -import cascading.tuple.{Tuple, Fields} -import com.twitter.scalding.{JobTest, Tsv, TupleConversions} +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.junit.runner.RunWith import org.scalatest.FunSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource import scala._ @RunWith(classOf[JUnitRunner]) diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala index 3424a36..c4ca5aa 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala @@ -1,15 +1,18 @@ package sandcrawler -import cascading.tuple.{Tuple, Fields} -import com.twitter.scalding.{JobTest, Tsv, TupleConversions} +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.junit.runner.RunWith import org.scalatest.FunSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource import scala._ /** @@ -47,12 +50,10 @@ class HBaseRowCountTest extends FunSpec with TupleConversions { outputBuffer => it("should return the test data provided.") { - println("outputBuffer.size => " + outputBuffer.size) assert(outputBuffer.size === 1) } it("should return the correct count") { - println("raw output => " + outputBuffer) assert(outputBuffer(0).getObject(0) === 8) } } diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index d7689cd..0da0b9c 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -1,15 +1,19 @@ package sandcrawler -import cascading.tuple.{Tuple, Fields} -import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions} +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.junit.runner.RunWith import org.scalatest.FunSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource import scala._ @RunWith(classOf[JUnitRunner]) @@ -25,7 +29,8 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { val statusType1Bytes = Bytes.toBytes(statusType1) val statusType2Bytes = Bytes.toBytes(statusType2) - val sampleData = List( + val sampleData : List[List[Array[Byte]]] = List( + // TODO(bnewbold): now to express a null (empty value) in this list? List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes), List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes), List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes), diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala new file mode 100644 index 0000000..80d92aa --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala @@ -0,0 +1,57 @@ +package sandcrawler + +import org.scalatest._ + +// scalastyle:off null +class ScorableFeaturesTest extends FlatSpec with Matchers { + private def titleToSlug(s : String) : String = { + new ScorableFeatures(title = s).toSlug + } + + "toMapFeatures()" should "work with gnarly inputs" in { + new ScorableFeatures(title = null).toMapFeatures + new ScorableFeatures(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures + } + + "mapToSlug()" should "extract the parts of titles before a colon" in { + titleToSlug("HELLO:there") shouldBe "hellothere" + } + + it should "extract an entire colon-less string" in { + titleToSlug("hello THERE") shouldBe "hellothere" + } + + it should "return Scorable.NoSlug if given empty string" in { + titleToSlug("") shouldBe Scorable.NoSlug + } + + it should "return Scorable.NoSlug if given null" in { + titleToSlug(null) shouldBe Scorable.NoSlug + } + + it should "strip punctuation" in { + titleToSlug("HELLO!:the:re") shouldBe "hellothere" + titleToSlug("a:b:c") shouldBe "abc" + titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe "ifyourehappyandyouknowitclapyourhands" + titleToSlug(":;\"\'") shouldBe Scorable.NoSlug + } + + it should "filter stub titles" in { + titleToSlug("abstract") shouldBe Scorable.NoSlug + titleToSlug("title!") shouldBe Scorable.NoSlug + titleToSlug("a real title which is not on blacklist") shouldBe "arealtitlewhichisnotonblacklist" + } + + it should "strip special characters" in { + titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_") shouldBe Scorable.NoSlug + // TODO: titleToSlug("©™₨№…") shouldBe Scorable.NoSlug + // TODO: titleToSlug("πµΣσ") shouldBe Scorable.NoSlug + } + + it should "remove whitespace" in { + titleToSlug("foo bar : baz ::") shouldBe "foobarbaz" + titleToSlug("\na\t:b:c") shouldBe "abc" + titleToSlug("\n \t \r ") shouldBe Scorable.NoSlug + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala new file mode 100644 index 0000000..f63bef8 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -0,0 +1,73 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScorableTest extends FlatSpec with Matchers { + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + "jsonToMap()" should "return a map, given a legal JSON string" in { + Scorable.jsonToMap(JsonString) should not be (None) + } + + it should "return None, given illegal JSON" in { + Scorable.jsonToMap("illegal{,json{{") should be (None) + } + + "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { + val score = Scorable.computeSimilarity( + new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) + score shouldBe Scorable.MaxScore + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala new file mode 100644 index 0000000..f92ba31 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -0,0 +1,226 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreJobTest extends FlatSpec with Matchers { + //scalastyle:off + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + // scalastyle:on + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") + + // scalastyle:off + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + // scalastyle:on + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStrings = List( + CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")) + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val Sha1Strings : List[String] = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", + "sha1:93187A85273589347598473894839443", + "sha1:024937534094897039547e9824382943") + + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2") + ) + + // bnewbold: status codes aren't strings, they are uint64 + val Ok : Long = 200 + val Bad : Long = 400 + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad) + + val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) + .zipped + .toList + .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + // Add example of lines without GROBID data + val SampleData = SampleDataHead :+ new Tuple( + new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) + + JobTest("sandcrawler.ScoreJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("crossref-input", input) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) + .source(TextLine(input), List( + 0 -> CrossrefStrings(0), + 1 -> CrossrefStrings(1), + 2 -> CrossrefStrings(2), + 3 -> CrossrefStrings(3))) + .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) { + // Grobid titles and slugs (in parentheses): + // Title 1 (title1) + // Title 2: TNG (title2tng) + // Title 3: The Sequel (title3thesequel) + // crossref titles and slugs (in parentheses): + // Title 2: TNG (title2tng) + // Title 1: TNG 2A (title1tng2a) + // Title 1: TNG 3 (title1tng3) + // Title 2: Rebooted (title2rebooted) + // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug + outputBuffer => + "The pipeline" should "return a 1-element list" in { + outputBuffer should have length 1 + } + + it should "has right # of entries with each slug" in { + val slugs = outputBuffer.map(_._1) + val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) + // XXX: countMap("title1") shouldBe 3 + countMap("title2tng") shouldBe 1 + } + + def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = { + val mf1 : MapFeatures = GrobidScorable.jsonToMapFeatures( + Sha1Strings(grobidIndex), + JsonStrings(grobidIndex)) + val mf2 : MapFeatures = CrossrefScorable.jsonToMapFeatures( + CrossrefStrings(crossrefIndex)) + val score = Scorable.computeSimilarity( + ReduceFeatures(mf1.json), + ReduceFeatures(mf2.json)) + (slug, score, mf1.json, mf2.json) + } + + it should "have right output values" in { + //outputBuffer.exists(_ == bundle("title1", 0, 0)) + //outputBuffer.exists(_ == bundle("title1", 0, 2)) + //outputBuffer.exists(_ == bundle("title1", 0, 1)) + outputBuffer.exists(_ == bundle("title2tng", 1, 3)) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala new file mode 100644 index 0000000..410819b --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala @@ -0,0 +1,85 @@ +package sandcrawler + +import org.scalatest._ + +class StringUtilitiesTest extends FlatSpec with Matchers { + "removeAccents()" should "handle the empty string" in { + StringUtilities.removeAccents("") shouldBe "" + } + + it should "not change a string with unaccented characters" in { + StringUtilities.removeAccents("abc123") shouldBe "abc123" + } + + it should "remove accents from Ls" in { + StringUtilities.removeAccents("E\u0141\u0142en") shouldBe "ELlen" + } + + it should "remove accents from Es without changing case" in { + val result = StringUtilities.removeAccents("\u00e9") + result should have length 1 + result shouldBe "e" + } + + it should "convert the ø in Soren" in { + StringUtilities.removeAccents("Søren") shouldBe "Soren" + StringUtilities.removeAccents("SØREN") shouldBe "SOREN" + } + + "removePunctuation" should "work on the empty string" in { + StringUtilities.removePunctuation("") shouldBe "" + } + + it should "work on non-empty text strings" in { + StringUtilities.removePunctuation("Hello, world!") shouldBe "Hello world" + StringUtilities.removePunctuation(":-)") shouldBe "" + StringUtilities.removePunctuation("<<---a---b--->") shouldBe "ab" + } + + // Tests adapted from https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ + "stringDistance" should "work on empty strings" in { + StringUtilities.stringDistance("", "") shouldBe 0 + StringUtilities.stringDistance("a", "") shouldBe 1 + StringUtilities.stringDistance("", "a") shouldBe 1 + StringUtilities.stringDistance("abc", "") shouldBe 3 + StringUtilities.stringDistance("", "abc") shouldBe 3 + } + + it should "work on equal strings" in { + StringUtilities.stringDistance("", "") shouldBe 0 + StringUtilities.stringDistance("a", "a") shouldBe 0 + StringUtilities.stringDistance("abc", "abc") shouldBe 0 + } + + it should "work where only inserts are needed" in { + StringUtilities.stringDistance("", "a") shouldBe 1 + StringUtilities.stringDistance("a", "ab") shouldBe 1 + StringUtilities.stringDistance("b", "ab") shouldBe 1 + StringUtilities.stringDistance("ac", "abc") shouldBe 1 + StringUtilities.stringDistance("abcdefg", "xabxcdxxefxgx") shouldBe 6 + } + + it should "work where only deletes are needed" in { + StringUtilities.stringDistance( "a", "") shouldBe 1 + StringUtilities.stringDistance( "ab", "a") shouldBe 1 + StringUtilities.stringDistance( "ab", "b") shouldBe 1 + StringUtilities.stringDistance("abc", "ac") shouldBe 1 + StringUtilities.stringDistance("xabxcdxxefxgx", "abcdefg") shouldBe 6 + } + + it should "work where only substitutions are needed" in { + StringUtilities.stringDistance( "a", "b") shouldBe 1 + StringUtilities.stringDistance( "ab", "ac") shouldBe 1 + StringUtilities.stringDistance( "ac", "bc") shouldBe 1 + StringUtilities.stringDistance("abc", "axc") shouldBe 1 + StringUtilities.stringDistance("xabxcdxxefxgx", "1ab2cd34ef5g6") shouldBe 6 + } + + it should "work where many operations are needed" in { + StringUtilities.stringDistance("example", "samples") shouldBe 3 + StringUtilities.stringDistance("sturgeon", "urgently") shouldBe 6 + StringUtilities.stringDistance("levenshtein", "frankenstein") shouldBe 6 + StringUtilities.stringDistance("distance", "difference") shouldBe 5 + StringUtilities.stringDistance("java was neat", "scala is great") shouldBe 7 + } +} |