From b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Mon, 6 Aug 2018 14:16:19 -0700 Subject: Partly refactored HBaseCrossrefScoreJob. Everything compiles. --- scalding/src/main/scala/sandcrawler/Scorable.scala | 115 +++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 scalding/src/main/scala/sandcrawler/Scorable.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala new file mode 100644 index 0000000..8e0c560 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -0,0 +1,115 @@ +import scala.math +import scala.util.parsing.json.JSON + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +case class MapFeatures(val key : String, slug : String, json : String) +case class ReduceFeatures(json : String) +case class ReduceOutput(val score : Int, json1 : String, json2 : String) + +abstract class Scorable { + def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = + { + getFeaturesPipe(args) + .filter { entry => Scorable.isValidSlug(entry.slug) } + .groupBy { case MapFeatures(key, slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + } + + // abstract method + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] +} + +object Scorable { + val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable + + def isValidSlug(slug : String) = { + slug != NoSlug + } + + def jsonToMap(json : String) : Option[Map[String, Any]] = { + // https://stackoverflow.com/a/32717262/631051 + val jsonObject = JSON.parseFull(json) + if (jsonObject == None) { + None + } else { + Some(jsonObject.get.asInstanceOf[Map[String, Any]]) + } + } + + /* + def grobidToSlug(json : String) : Option[String] = { + jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + titleToSlug(getString(map, "title")) + } else { + None + } + } + } + } + + def crossrefToSlug(json : String) : Option[String] = { + jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + // TODO: Stop ignoring secondary titles + titleToSlug(map("title").asInstanceOf[List[String]](0)) + } else { + None + } + } + } + } + */ + + def titleToSlug(title : String) : String = { + val slug = StringUtilities.removeAccents(title).split(":")(0).toLowerCase() + if (slug.isEmpty) { + NoSlug + } else { + slug + } + } + + def getStringOption(optionalMap : Option[Map[String, Any]], key : String) + : Option[String] = { + optionalMap match { + case None => None + case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None + } + } + + // Caller is responsible for ensuring that key is in map. + def getString(map : Map[String, String], key : String) : String = { + assert(map contains key) + map(key).asInstanceOf[String] + } + + val MaxScore = 1000 + + def computeOutput(feature1 : ReduceFeatures, feature2 : ReduceFeatures) : + ReduceOutput = { + val json1 = jsonToMap(feature1.json) + val json2 = jsonToMap(feature2.json) + getStringOption(json1, "title") match { + case None => ReduceOutput(0, "No title", feature1.json) + case Some(title1) => { + getStringOption(json2, "title") match { + case None => ReduceOutput(0, "No title", feature2.json) + case Some(title2) => + ReduceOutput( + (StringUtilities.similarity(title1, title2) * MaxScore).toInt, + feature1.json, feature2.json) + } + } + } + } +} -- cgit v1.2.3 From 308b33d889d804380427d2aa112efec77b3e1770 Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Mon, 6 Aug 2018 16:38:46 -0700 Subject: New code compiles. Old tests pass. New tests not yet written. --- .../main/scala/sandcrawler/GrobidScorable.scala | 48 ++++++++++++++++++++++ .../scala/sandcrawler/HBaseCrossrefScoreJob.scala | 6 +-- scalding/src/main/scala/sandcrawler/Scorable.scala | 9 ++-- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 9 ++-- .../main/scala/sandcrawler/StringUtilities.scala | 2 + 5 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 scalding/src/main/scala/sandcrawler/GrobidScorable.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala new file mode 100644 index 0000000..5dac64c --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -0,0 +1,48 @@ +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorable extends Scorable with HBasePipeConversions { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { + // TODO: Clean up code after debugging. + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) + + val pipe0 : Pipe = grobidSource.read + val grobidPipe : TypedPipe[MapFeatures] = pipe0 + .fromBytesWritable(new Fields("key", "tei_json")) + // .debug // Should be 4 tuples for mocked data + // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) + // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .map { entry => + val (key : String, json : String) = (entry._1, entry._2) + HBaseCrossrefScore.grobidToSlug(json) match { + case Some(slug) => new MapFeatures(slug, key, json) + case None => new MapFeatures(Scorable.NoSlug, key, json) + } + } + .filter { + _.slug != Scorable.NoSlug + } + grobidPipe + } +/* + def fromBytesWritableLocal(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe) { (p, fld) => { + p.map(fld.toString -> fld.toString) { from: org.apache.hadoop.hbase.io.ImmutableBytesWritable => + Option(from).map(x => Bytes.toString(x.get)).getOrElse(null) + } + }} + } + */ +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 01d852e..2fbb19f 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -27,8 +27,9 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), args("zookeeper-hosts")) - val grobidPipe : TypedPipe[(String, String, String)] = grobidSource - .read + + val pipe0 : cascading.pipe.Pipe = grobidSource.read + val grobidPipe : TypedPipe[(String, String, String)] = pipe0 .fromBytesWritable(new Fields("key", "tei_json")) // .debug // Should be 4 tuples for mocked data .toTypedPipe[(String, String)]('key, 'tei_json) @@ -78,7 +79,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} // Output: score, sha1, doi, grobid title, crossref title .write(TypedTsv[(Int, String, String, String, String)](args("output"))) - } object HBaseCrossrefScore { diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 8e0c560..89dc835 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -1,6 +1,9 @@ +package sandcrawler + import scala.math import scala.util.parsing.json.JSON +import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ @@ -9,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args) + getFeaturesPipe(args)(flowDef, mode) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(key, slug, json) => slug } .map { tuple => @@ -21,7 +24,7 @@ abstract class Scorable { } // abstract method - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 8d4d957..22cc9e9 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,16 +1,19 @@ +package sandcrawler + import java.text.Normalizer import scala.math import scala.util.parsing.json.JSON +import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions { - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) +class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions { + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala index 290b03f..1ae6db3 100644 --- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -1,3 +1,5 @@ +package sandcrawler + import java.text.Normalizer import java.util.regex.Pattern -- cgit v1.2.3 From c71b2da70ff7d3b77082db25672f6f3669f2238c Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Tue, 7 Aug 2018 09:51:18 -0700 Subject: Added CrossrefScorable.scala. All code compiles. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 27 ++++++++++++++++++++++ .../main/scala/sandcrawler/GrobidScorable.scala | 13 ++++------- scalding/src/main/scala/sandcrawler/Scorable.scala | 4 ++-- 3 files changed, 34 insertions(+), 10 deletions(-) create mode 100644 scalding/src/main/scala/sandcrawler/CrossrefScorable.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala new file mode 100644 index 0000000..a603e2d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -0,0 +1,27 @@ +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class CrossrefScorable extends Scorable { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { +// val crossrefSource = TextLine(args("crossref-input")) +// val crossrefPipe : TypedPipe[MapFeatures] = crossrefSource + TextLine(args("crossref-input")) + .read + .toTypedPipe[String](new Fields("line")) + .map{ json : String => + HBaseCrossrefScore.crossrefToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } + } +// crossrefPipe + } +} diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 5dac64c..8da7708 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -16,8 +16,9 @@ class GrobidScorable extends Scorable with HBasePipeConversions { args("hbase-table"), args("zookeeper-hosts")) - val pipe0 : Pipe = grobidSource.read - val grobidPipe : TypedPipe[MapFeatures] = pipe0 +// val pipe0 : Pipe = grobidSource.read +// val grobidPipe : TypedPipe[MapFeatures] = pipe0 + grobidSource.read .fromBytesWritable(new Fields("key", "tei_json")) // .debug // Should be 4 tuples for mocked data // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) @@ -26,14 +27,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions { .map { entry => val (key : String, json : String) = (entry._1, entry._2) HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => new MapFeatures(slug, key, json) - case None => new MapFeatures(Scorable.NoSlug, key, json) + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) } } - .filter { - _.slug != Scorable.NoSlug - } - grobidPipe } /* def fromBytesWritableLocal(f: Fields): Pipe = { diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 89dc835..950a6d4 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -7,7 +7,7 @@ import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -case class MapFeatures(val key : String, slug : String, json : String) +case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) case class ReduceOutput(val score : Int, json1 : String, json2 : String) @@ -16,7 +16,7 @@ abstract class Scorable { { getFeaturesPipe(args)(flowDef, mode) .filter { entry => Scorable.isValidSlug(entry.slug) } - .groupBy { case MapFeatures(key, slug, json) => slug } + .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple (slug, ReduceFeatures(features.json)) -- cgit v1.2.3 From cbd6433af7949df7c4433468bf99eefe9973e864 Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Tue, 7 Aug 2018 10:11:54 -0700 Subject: Removed commented-out code. --- scalding/src/main/scala/sandcrawler/Scorable.scala | 29 ------ .../src/test/scala/sandcrawler/ScorableTest.scala | 108 +++++++++++++++++++++ 2 files changed, 108 insertions(+), 29 deletions(-) create mode 100644 scalding/src/test/scala/sandcrawler/ScorableTest.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 950a6d4..948002b 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -44,35 +44,6 @@ object Scorable { } } - /* - def grobidToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - titleToSlug(getString(map, "title")) - } else { - None - } - } - } - } - - def crossrefToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - // TODO: Stop ignoring secondary titles - titleToSlug(map("title").asInstanceOf[List[String]](0)) - } else { - None - } - } - } - } - */ - def titleToSlug(title : String) : String = { val slug = StringUtilities.removeAccents(title).split(":")(0).toLowerCase() if (slug.isEmpty) { diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala new file mode 100644 index 0000000..0375b6a --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -0,0 +1,108 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class HBaseCrossrefScoreTest extends FlatSpec with Matchers { + val JsonString = """ +{ + "title": "<>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + val MalformedJsonString = JsonString.replace("}", "") + + "titleToSlug()" should "extract the parts of titles before a colon" in { + val slug = Scorable.titleToSlug("HELLO:there") + slug should contain ("hello") + } + + it should "extract an entire colon-less string" in { + val slug = Scorable.titleToSlug("hello THERE") + slug should contain ("hello there") + } + + it should "return None if given empty string" in { + Scorable.titleToSlug("") shouldBe None + } + + "jsonToMap()" should "return a map, given a legal JSON string" in { + Scorable.jsonToMap(jsonString) should be (Some(_)) + } + + it should "return None, given illegal JSON" in { + Scorable.jsonToMap("illegal{,json{{") should be (None)) + } + +/* + it should "return None if given a malformed json string" in { + val slug = Scorable.grobidToSlug(MalformedGrobidString) + slug shouldBe None + } + + it should "return None if given an empty json string" in { + val slug = Scorable.grobidToSlug("") + slug shouldBe None + } + + "crossrefToSlug()" should "get the right slug for a crossref json string" in { + val slug = Scorable.crossrefToSlug(CrossrefStringWithTitle) + slug should contain ("sometitle") + } + + it should "return None if given json string without title" in { + val slug = Scorable.grobidToSlug(CrossrefStringWithoutTitle) + slug shouldBe None + } + + it should "return None if given a malformed json string" in { + val slug = Scorable.grobidToSlug(MalformedCrossrefString) + slug shouldBe None + } + */ +} + -- cgit v1.2.3 From 4981a98358aae098714d2266404f7b167993bf0c Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Tue, 7 Aug 2018 10:28:48 -0700 Subject: Minor refactoring. Added test. --- scalding/src/main/scala/sandcrawler/Scorable.scala | 15 ++++++--------- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 4 +++- scalding/src/test/scala/sandcrawler/ScorableTest.scala | 5 +++-- 3 files changed, 12 insertions(+), 12 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 948002b..77bb7ae 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -69,19 +69,16 @@ object Scorable { val MaxScore = 1000 - def computeOutput(feature1 : ReduceFeatures, feature2 : ReduceFeatures) : - ReduceOutput = { - val json1 = jsonToMap(feature1.json) - val json2 = jsonToMap(feature2.json) + def computeSimilarity(features1 : ReduceFeatures, features2 : ReduceFeatures) : Int = { + val json1 = jsonToMap(features1.json) + val json2 = jsonToMap(features2.json) getStringOption(json1, "title") match { - case None => ReduceOutput(0, "No title", feature1.json) + case None => 0 case Some(title1) => { getStringOption(json2, "title") match { - case None => ReduceOutput(0, "No title", feature2.json) + case None => 0 case Some(title2) => - ReduceOutput( - (StringUtilities.similarity(title1, title2) * MaxScore).toInt, - feature1.json, feature2.json) + (StringUtilities.similarity(title1, title2) * MaxScore).toInt } } } diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 22cc9e9..e6a5dc1 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -17,7 +17,9 @@ class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : Fl pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry - Scorable.computeOutput(features1, features2) + new ReduceOutput(Scorable.computeSimilarity(features1, features2), + features1.json, + features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) } diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 535b8f6..9437fe6 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -77,8 +77,9 @@ class ScorableTest extends FlatSpec with Matchers { } "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { - val output = Scorable.computeOutput(new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) - output.score shouldBe Scorable.MaxScore + val score = Scorable.computeSimilarity( + new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) + score shouldBe Scorable.MaxScore } /* -- cgit v1.2.3 From 71b8d527da73f99ffb1b09ec1044031e772d1db6 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Tue, 7 Aug 2018 11:24:06 -0700 Subject: Added punctuation removal to slug creation and similarity comparisons --- scalding/src/main/scala/sandcrawler/Scorable.scala | 3 ++- scalding/src/main/scala/sandcrawler/StringUtilities.scala | 8 +++++++- scalding/src/test/scala/sandcrawler/ScorableTest.scala | 7 +++++++ scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala | 10 ++++++++++ 4 files changed, 26 insertions(+), 2 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 77bb7ae..736c175 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -45,7 +45,8 @@ object Scorable { } def titleToSlug(title : String) : String = { - val slug = StringUtilities.removeAccents(title).split(":")(0).toLowerCase() + val slug = StringUtilities.removePunctuation( + StringUtilities.removeAccents(title).split(":")(0).toLowerCase()) if (slug.isEmpty) { NoSlug } else { diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala index 1ae6db3..3058f15 100644 --- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -25,9 +25,15 @@ object StringUtilities { pattern.matcher(sb).replaceAll("") } + // Source: https://stackoverflow.com/a/30076541/631051 + def removePunctuation(s: String) : String = { + s.replaceAll("""[\p{Punct}&&[^.]]""", "") + } + // Adapted from: https://stackoverflow.com/a/16018452/631051 def similarity(s1a : String, s2a : String) : Double = { - val (s1, s2) = (removeAccents(s1a), removeAccents(s2a)) + val (s1, s2) = (removeAccents(removePunctuation(s1a)), + removeAccents(removePunctuation(s2a))) val longer : String = if (s1.length > s2.length) s1 else s2 val shorter : String = if (s1.length > s2.length) s2 else s1 if (longer.length == 0) { diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 8445073..713a7e5 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -71,6 +71,13 @@ class ScorableTest extends FlatSpec with Matchers { Scorable.titleToSlug("") shouldBe Scorable.NoSlug } + "titleToSlug()" should "strip punctuation" in { + Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" + Scorable.titleToSlug("a:b:c") shouldBe "a" + Scorable.titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" + } + "jsonToMap()" should "return a map, given a legal JSON string" in { Scorable.jsonToMap(JsonString) should not be (None) } diff --git a/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala index 2df5a22..410819b 100644 --- a/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala +++ b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala @@ -26,6 +26,16 @@ class StringUtilitiesTest extends FlatSpec with Matchers { StringUtilities.removeAccents("SØREN") shouldBe "SOREN" } + "removePunctuation" should "work on the empty string" in { + StringUtilities.removePunctuation("") shouldBe "" + } + + it should "work on non-empty text strings" in { + StringUtilities.removePunctuation("Hello, world!") shouldBe "Hello world" + StringUtilities.removePunctuation(":-)") shouldBe "" + StringUtilities.removePunctuation("<<---a---b--->") shouldBe "ab" + } + // Tests adapted from https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ "stringDistance" should "work on empty strings" in { StringUtilities.stringDistance("", "") shouldBe 0 -- cgit v1.2.3 From 6d64c5d4e1527c7277527132efa858def2589486 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 11:30:44 -0700 Subject: Added test for null argument to titleToSlug() --- scalding/src/main/scala/sandcrawler/Scorable.scala | 13 +++++++++---- scalding/src/test/scala/sandcrawler/ScorableTest.scala | 4 ++++ 2 files changed, 13 insertions(+), 4 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 736c175..ce4fdca 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -45,12 +45,17 @@ object Scorable { } def titleToSlug(title : String) : String = { - val slug = StringUtilities.removePunctuation( - StringUtilities.removeAccents(title).split(":")(0).toLowerCase()) - if (slug.isEmpty) { + if (title == null || title.isEmpty) { NoSlug } else { - slug + val unaccented = StringUtilities.removeAccents(title) + // Remove punctuation after splitting on colon. + val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())) + if (slug.isEmpty || slug == null) { + NoSlug + } else { + slug + } } } diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 713a7e5..40801a0 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -71,6 +71,10 @@ class ScorableTest extends FlatSpec with Matchers { Scorable.titleToSlug("") shouldBe Scorable.NoSlug } + it should "return Scorable.NoSlug if given null" in { + Scorable.titleToSlug(null) shouldBe Scorable.NoSlug + } + "titleToSlug()" should "strip punctuation" in { Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" Scorable.titleToSlug("a:b:c") shouldBe "a" -- cgit v1.2.3 From 25ade249538aade9dcd39d459bacdf43ea0a7dd6 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 11:38:05 -0700 Subject: Fixed scalastyle violations. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 2 +- .../src/main/scala/sandcrawler/GrobidScorable.scala | 21 +++++++++------------ scalding/src/main/scala/sandcrawler/Scorable.scala | 7 +++---- .../main/scala/sandcrawler/StringUtilities.scala | 2 +- 4 files changed, 14 insertions(+), 18 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index cf5849c..ee4cc54 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { TextLine(args("crossref-input")) .read .toTypedPipe[String](new Fields("line")) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index bf36855..95d6dae 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { // TODO: Clean up code after debugging. val grobidSource = HBaseBuilder.build( args("hbase-table"), @@ -18,21 +18,18 @@ class GrobidScorable extends Scorable with HBasePipeConversions { List("grobid0:tei_json"), SourceMode.SCAN_ALL) -// val pipe0 : Pipe = grobidSource.read -// val grobidPipe : TypedPipe[MapFeatures] = pipe0 grobidSource.read - .fromBytesWritable(new Fields("key", "tei_json")) - // .debug // Should be 4 tuples for mocked data + .fromBytesWritable(new Fields("key", "tei_json")) // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) - .toTypedPipe[(String, String)](new Fields("key", "tei_json")) - .map { entry => - val (key : String, json : String) = (entry._1, entry._2) - GrobidScorable.grobidToSlug(json) match { - case Some(slug) => new MapFeatures(slug, json) - case None => new MapFeatures(Scorable.NoSlug, json) + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .map { entry => + val (key : String, json : String) = (entry._1, entry._2) + GrobidScorable.grobidToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } } - } } } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index ce4fdca..86336cb 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -30,7 +30,7 @@ abstract class Scorable { object Scorable { val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable - def isValidSlug(slug : String) = { + def isValidSlug(slug : String) : Boolean = { slug != NoSlug } @@ -59,8 +59,7 @@ object Scorable { } } - def getStringOption(optionalMap : Option[Map[String, Any]], key : String) - : Option[String] = { + def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = { optionalMap match { case None => None case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None @@ -83,7 +82,7 @@ object Scorable { case Some(title1) => { getStringOption(json2, "title") match { case None => 0 - case Some(title2) => + case Some(title2) => (StringUtilities.similarity(title1, title2) * MaxScore).toInt } } diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala index 3058f15..b6e5554 100644 --- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -32,7 +32,7 @@ object StringUtilities { // Adapted from: https://stackoverflow.com/a/16018452/631051 def similarity(s1a : String, s2a : String) : Double = { - val (s1, s2) = (removeAccents(removePunctuation(s1a)), + val (s1, s2) = (removeAccents(removePunctuation(s1a)), removeAccents(removePunctuation(s2a))) val longer : String = if (s1.length > s2.length) s1 else s2 val shorter : String = if (s1.length > s2.length) s2 else s1 -- cgit v1.2.3 From 9d7adc94ad63e85ffb2b459d4a8c2ed0ed46d8c8 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 19:03:01 -0700 Subject: WIP --- .../main/scala/sandcrawler/CrossrefScorable.scala | 1 + .../main/scala/sandcrawler/GrobidScorable.scala | 15 +- scalding/src/main/scala/sandcrawler/Scorable.scala | 2 +- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 46 ++++-- .../src/test/scala/sandcrawler/ScorableTest.scala | 112 ++++--------- .../src/test/scala/sandcrawler/ScoreJobTest.scala | 177 +++++++++++++++++++++ 6 files changed, 251 insertions(+), 102 deletions(-) create mode 100644 scalding/src/test/scala/sandcrawler/ScoreJobTest.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index ee4cc54..d5da845 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -11,6 +11,7 @@ import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { + // TODO: Generalize args so there can be multiple Grobid pipes in one job. TextLine(args("crossref-input")) .read .toTypedPipe[String](new Fields("line")) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 95d6dae..4c67074 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -11,14 +11,9 @@ import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { - // TODO: Clean up code after debugging. - val grobidSource = HBaseBuilder.build( - args("hbase-table"), - args("zookeeper-hosts"), - List("grobid0:tei_json"), - SourceMode.SCAN_ALL) - - grobidSource.read + // TODO: Generalize args so there can be multiple grobid pipes in one job. + GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + .read .fromBytesWritable(new Fields("key", "tei_json")) // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) @@ -34,6 +29,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions { } object GrobidScorable { + def getHBaseSource(table : String, host : String) : HBaseSource = { + HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL) + } + def grobidToSlug(json : String) : Option[String] = { Scorable.jsonToMap(json) match { case None => None diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 86336cb..cfdc192 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -9,7 +9,7 @@ import com.twitter.scalding.typed.TDsl._ case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) -case class ReduceOutput(val score : Int, json1 : String, json2 : String) +case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index e6a5dc1..aa20d0f 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,25 +1,53 @@ package sandcrawler -import java.text.Normalizer - -import scala.math -import scala.util.parsing.json.JSON - import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions { - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode) +class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with + HBasePipeConversions { + /* + val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry - new ReduceOutput(Scorable.computeSimilarity(features1, features2), + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), features1.json, features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) + */ +} + +// Ugly hack to get non-String information into ScoreJob above. +object ScoreJob { + var scorable1 : Option[Scorable] = None + var scorable2 : Option[Scorable] = None + + def setScorable1(s : Scorable) { + scorable1 = Some(s) + } + + def getScorable1() : Scorable = { + scorable1 match { + case Some(s) => s + case None => null + } + } + + def setScorable2(s: Scorable) { + scorable2 = Some(s) + } + + def getScorable2() : Scorable = { + scorable2 match { + case Some(s) => s + case None => null + } + } } diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 40801a0..2f80492 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -9,7 +9,7 @@ import org.scalatest._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode class ScorableTest extends FlatSpec with Matchers { - val JsonString = """ + val JsonString = """ { "title": "<<TITLE>>", "authors": [ @@ -55,96 +55,40 @@ class ScorableTest extends FlatSpec with Matchers { } """ - performUnitTests() - performPipelineTests() - - def performUnitTests() { - "titleToSlug()" should "extract the parts of titles before a colon" in { - Scorable.titleToSlug("HELLO:there") shouldBe "hello" - } - - it should "extract an entire colon-less string" in { - Scorable.titleToSlug("hello THERE") shouldBe "hello there" - } - - it should "return Scorable.NoSlug if given empty string" in { - Scorable.titleToSlug("") shouldBe Scorable.NoSlug - } - - it should "return Scorable.NoSlug if given null" in { - Scorable.titleToSlug(null) shouldBe Scorable.NoSlug - } - - "titleToSlug()" should "strip punctuation" in { - Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" - Scorable.titleToSlug("a:b:c") shouldBe "a" - Scorable.titleToSlug( - "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" - } + "titleToSlug()" should "extract the parts of titles before a colon" in { + Scorable.titleToSlug("HELLO:there") shouldBe "hello" + } - "jsonToMap()" should "return a map, given a legal JSON string" in { - Scorable.jsonToMap(JsonString) should not be (None) - } + it should "extract an entire colon-less string" in { + Scorable.titleToSlug("hello THERE") shouldBe "hello there" + } - it should "return None, given illegal JSON" in { - Scorable.jsonToMap("illegal{,json{{") should be (None) - } + it should "return Scorable.NoSlug if given empty string" in { + Scorable.titleToSlug("") shouldBe Scorable.NoSlug + } - "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { - val score = Scorable.computeSimilarity( - new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) - score shouldBe Scorable.MaxScore - } + it should "return Scorable.NoSlug if given null" in { + Scorable.titleToSlug(null) shouldBe Scorable.NoSlug } - def performPipelineTests() { - /* + "titleToSlug()" should "strip punctuation" in { + Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" + Scorable.titleToSlug("a:b:c") shouldBe "a" + Scorable.titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" + } - val output = "/tmp/testOutput" - val input = "/tmp/testInput" - val (testTable, testHost) = ("test-table", "dummy-host:2181") + "jsonToMap()" should "return a map, given a legal JSON string" in { + Scorable.jsonToMap(JsonString) should not be (None) + } - val grobidSampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), - Bytes.toBytes(MalformedGrobidString))) + it should "return None, given illegal JSON" in { + Scorable.jsonToMap("illegal{,json{{") should be (None) + } - JobTest("sandcrawler.HBaseCrossrefScoreJob") - .arg("test", "") - .arg("app.conf.path", "app.conf") - .arg("output", output) - .arg("hbase-table", testTable) - .arg("zookeeper-hosts", testHost) - .arg("crossref-input", input) - .arg("debug", "true") - .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), - grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .source(TextLine(input), List( - 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), - 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), - 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) - .sink[(Int, String, String, String, String)](TypedTsv[(Int, - String, String, String, String)](output)) { - // Grobid titles: - // "Title 1", "Title 2: TNG", "Title 3: The Sequel" - // crossref slugs: - // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" - // Join should have 3 "Title 1" slugs and 1 "Title 2" slug - outputBuffer => - "The pipeline" should "return a 4-element list" in { - outputBuffer should have length 4 - } - } - .run - .finish -} - */ + "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { + val score = Scorable.computeSimilarity( + new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) + score shouldBe Scorable.MaxScore } } - diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala new file mode 100644 index 0000000..22cbdb8 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -0,0 +1,177 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreJobTest extends FlatSpec with Matchers { + val GrobidString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") + val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") + val MalformedGrobidString = GrobidString.replace("}", "") + + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val grobidSampleData = List( + List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), + List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), + List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), + List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), + Bytes.toBytes(MalformedGrobidString))) + + // TODO: Make less yucky. + ScoreJob.setScorable1(new CrossrefScorable()) + ScoreJob.setScorable2(new GrobidScorable()) + + JobTest("sandcrawler.ScoreJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("crossref-input", input) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), + grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .source(TextLine(input), List( + 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), + 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), + 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) + .sink[ReduceOutput](TypedTsv[ReduceOutput](output)) { + // Grobid titles: + // "Title 1", "Title 2: TNG", "Title 3: The Sequel" + // crossref slugs: + // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" + // Join should have 3 "Title 1" slugs and 1 "Title 2" slug + outputBuffer => + "The pipeline" should "return a 4-element list" in { + outputBuffer should have length 4 + } + + /* + it should "return the right first entry" in { + outputBuffer(0) shouldBe ReduceOutput("slug", 50, "", + "") + val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) + slug shouldBe "title 1" + slug shouldBe slug0 + slug shouldBe slug1 + sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") + grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") + } + */ + } + .run + .finish +} -- cgit v1.2.3 From 818ad070626d6af7c490017e0bd9b53f30f20150 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 19:07:19 -0700 Subject: Removed implicit parameters. Does not compile. --- scalding/src/main/scala/sandcrawler/CrossrefScorable.scala | 2 +- scalding/src/main/scala/sandcrawler/GrobidScorable.scala | 2 +- scalding/src/main/scala/sandcrawler/Scorable.scala | 6 +++--- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 9 ++++----- 4 files changed, 9 insertions(+), 10 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index d5da845..b221718 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { // TODO: Generalize args so there can be multiple Grobid pipes in one job. TextLine(args("crossref-input")) .read diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 4c67074..6229718 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { // TODO: Generalize args so there can be multiple grobid pipes in one job. GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) .read diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index cfdc192..2d2345b 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -12,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args)(flowDef, mode) + getFeaturesPipe(args) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => @@ -24,7 +24,7 @@ abstract class Scorable { } // abstract method - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index aa20d0f..66ba29e 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -6,11 +6,11 @@ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with +class ScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - /* - val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode) + + val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry @@ -21,7 +21,6 @@ class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBa features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) - */ } // Ugly hack to get non-String information into ScoreJob above. -- cgit v1.2.3 From 28c0518379d226ac25597c2840c5c81bd8551487 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 20:26:31 -0700 Subject: WIP --- scalding/src/main/scala/sandcrawler/CrossrefScorable.scala | 9 ++++++--- scalding/src/main/scala/sandcrawler/GrobidScorable.scala | 9 +++++---- scalding/src/main/scala/sandcrawler/Scorable.scala | 9 +++++---- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 7 +++++-- 4 files changed, 21 insertions(+), 13 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index b221718..249c9ab 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,10 +10,13 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { - // TODO: Generalize args so there can be multiple Grobid pipes in one job. + // TODO: Generalize args so there can be multiple Grobid pipes in one job. + def getSource(args : Args) : Source = { TextLine(args("crossref-input")) - .read + } + + def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + pipe .toTypedPipe[String](new Fields("line")) .map{ json : String => CrossrefScorable.crossrefToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 6229718..5c6b140 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -10,13 +10,14 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { + def getSource(args : Args) : Source = { // TODO: Generalize args so there can be multiple grobid pipes in one job. GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - .read + } + + def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + pipe .fromBytesWritable(new Fields("key", "tei_json")) - // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) - // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) .toTypedPipe[(String, String)](new Fields("key", "tei_json")) .map { entry => val (key : String, json : String) = (entry._1, entry._2) diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 2d2345b..92b61bc 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -12,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(pipe : Pipe) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args) + getFeaturesPipe(pipe) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => @@ -23,8 +23,9 @@ abstract class Scorable { } } - // abstract method - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] + // abstract methods + def getSource(args : Args) : Source + def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 66ba29e..7891596 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -9,8 +9,11 @@ import parallelai.spyglass.hbase.HBasePipeConversions class ScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args) + // TODO: Instantiate any subclass of Scorable specified in args. + Scorable sc1 = new GrobidScorable() + Scorable sc2 = new CrossrefScorable() + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry -- cgit v1.2.3 From 2528dd4afdf2e1a3419dbf354011f1ecc25c77a5 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 21:01:08 -0700 Subject: WIP --- .../main/scala/sandcrawler/CrossrefScorable.scala | 3 +- .../main/scala/sandcrawler/GrobidScorable.scala | 5 +- .../scala/sandcrawler/HBaseCrossrefScoreJob.scala | 218 +++++++++++++++++++++ scalding/src/main/scala/sandcrawler/Scorable.scala | 5 +- 4 files changed, 226 insertions(+), 5 deletions(-) create mode 100644 scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 249c9ab..9842122 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -8,8 +8,9 @@ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource +import TDsl._ -class CrossrefScorable extends Scorable { +class CrossrefScorable extends Scorable with HBasePipeConversions { // TODO: Generalize args so there can be multiple Grobid pipes in one job. def getSource(args : Args) : Source = { TextLine(args("crossref-input")) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 5c6b140..51e40f9 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -8,6 +8,7 @@ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource +import TDsl._ class GrobidScorable extends Scorable with HBasePipeConversions { def getSource(args : Args) : Source = { @@ -15,10 +16,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions { GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) } - def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { pipe .fromBytesWritable(new Fields("key", "tei_json")) - .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .toTypedPipe[(String, String)](new Fields('key, 'tei_json)) .map { entry => val (key : String, json : String) = (entry._1, entry._2) GrobidScorable.grobidToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala new file mode 100644 index 0000000..725474d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -0,0 +1,218 @@ +package sandcrawler + +import java.text.Normalizer +import java.util.Arrays +import java.util.Properties +import java.util.regex.Pattern + +import scala.math +import scala.util.parsing.json.JSON + +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.CoGrouped +import com.twitter.scalding.typed.Grouped +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { + val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable + + // key is SHA1 + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) + + val temp : cascading.pipe.Pipe = grobidSource + .read + .fromBytesWritable(new Fields("key", "tei_json")) + val grobidPipe : TypedPipe[(String, String, String)] = temp + // .debug // Should be 4 tuples for mocked data + .toTypedPipe[(String, String)]('key, 'tei_json) + .map { entry => + val (key, json) = (entry._1, entry._2) + // TODO: Consider passing forward only a subset of JSON. + HBaseCrossrefScore.grobidToSlug(json) match { + case Some(slug) => (slug, key, json) + case None => (NoTitle, key, json) + } + } + .filter { entry => + val (slug, _, _) = entry + slug != NoTitle + } +// .debug // SHould be 3 tuples for mocked data + + val grobidGroup = grobidPipe + .groupBy { case (slug, key, json) => slug } + + val crossrefSource = TextLine(args("crossref-input")) + val temp2 : cascading.pipe.Pipe = crossrefSource.read + val crossrefPipe : TypedPipe[(String, String)] = temp2 + // .debug // Should be 4 tuples for mocked data + .toTypedPipe[String]('line) + .map{ json : String => + HBaseCrossrefScore.crossrefToSlug(json) match { + case Some(slug) => (slug, json) + case None => (NoTitle, json) + } + } + .filter { entry => + val (slug, json) = entry + slug != NoTitle + } + + val crossrefGroup = crossrefPipe + .groupBy { case (slug, json) => slug } + + val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = + grobidGroup.join(crossrefGroup) + + theJoin.map{ entry => + val (slug : String, + ((slug0: String, sha1 : String, grobidJson : String), + (slug1 : String, crossrefJson : String))) = entry + HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} + // Output: score, sha1, doi, grobid title, crossref title + .write(TypedTsv[(Int, String, String, String, String)](args("output"))) + +} + +object HBaseCrossrefScore { + def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( + hbaseTable, // HBase Table Name + zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?) + List("grobid0:tei_json"), + SourceMode.SCAN_ALL) + + def jsonToMap(json : String) : Option[Map[String, Any]] = { + // https://stackoverflow.com/a/32717262/631051 + val jsonObject = JSON.parseFull(json) + if (jsonObject == None) { + None + } else { + Some(jsonObject.get.asInstanceOf[Map[String, Any]]) + } + } + + def grobidToSlug(json : String) : Option[String] = { + jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + titleToSlug(map("title").asInstanceOf[String]) + } else { + None + } + } + } + } + + def crossrefToSlug(json : String) : Option[String] = { + jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + // TODO: Don't ignore titles after the first. + titleToSlug(map("title").asInstanceOf[List[String]](0)) + } else { + None + } + } + } + } + + def titleToSlug(title : String) : Option[String] = { + val slug = removeAccents(title).split(":")(0).toLowerCase() + if (slug.isEmpty) { + None + } else { + Some(slug) + } + } + + val MaxScore = 1000 + + def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : + // (score, sha1, doi, grobidTitle, crossrefTitle) + (Int, String, String, String, String) = { + jsonToMap(grobidJson) match { + case None => (0, "", "", "", "") // This can't happen, because grobidJson already validated in earlier stage + case Some(grobid) => { + val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() + + jsonToMap(crossrefJson) match { + case None => (0, "", "", "", "") // This can't happen, because crossrefJson already validated in earlier stage + case Some(crossref) => { + val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() + + (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)), + sha1, + crossref("DOI").asInstanceOf[String], + "'" + grobidTitle + "'", + "'" + crossrefTitle + "'") + } + } + } + } + } + + // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 + def removeAccents(s : String) : String = { + val replacements = Map( + '\u0141' -> 'L', + '\u0142' -> 'l', // Letter ell + '\u00d8' -> 'O', + '\u00f8' -> 'o' + ) + val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) + for (i <- 0 to sb.length - 1) { + for (key <- replacements.keys) { + if (sb(i) == key) { + sb.deleteCharAt(i); + sb.insert(i, replacements(key)) + } + } + } + val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") + pattern.matcher(sb).replaceAll("") + } + + // Adapted from: https://stackoverflow.com/a/16018452/631051 + def similarity(s1 : String, s2 : String) : Int = { + val longer : String = if (s1.length > s2.length) s1 else s2 + val shorter : String = if (s1.length > s2.length) s2 else s1 + if (longer.length == 0) { + // Both strings are empty. + MaxScore + } else { + (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length + } + } + + // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ + def stringDistance(s1: String, s2: String): Int = { + val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() + def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) + def sd(s1: List[Char], s2: List[Char]): Int = { + if (!memo.contains((s1, s2))) { + memo((s1,s2)) = (s1, s2) match { + case (_, Nil) => s1.length + case (Nil, _) => s2.length + case (c1::t1, c2::t2) => + min( sd(t1,s2) + 1, sd(s1,t2) + 1, + sd(t1,t2) + (if (c1==c2) 0 else 1) ) + } + } + memo((s1,s2)) + } + + sd( s1.toList, s2.toList ) + } +} + diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 92b61bc..bd03d57 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -6,13 +6,14 @@ import scala.util.parsing.json.JSON import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ +import TDsl._ case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(pipe : Pipe) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(pipe : cascading.pipe.Pipe) : TypedPipe[(String, ReduceFeatures)] = { getFeaturesPipe(pipe) .filter { entry => Scorable.isValidSlug(entry.slug) } @@ -25,7 +26,7 @@ abstract class Scorable { // abstract methods def getSource(args : Args) : Source - def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] } object Scorable { -- cgit v1.2.3 From 5ce5e5dc98cdbb5a84c79313df93d670111e6a1d Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Thu, 9 Aug 2018 22:13:46 -0700 Subject: Broken code to share with Bryan. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 21 +++++++ .../main/scala/sandcrawler/GrobidScorable.scala | 2 +- .../scala/sandcrawler/HBaseCrossrefScoreJob.scala | 8 +-- scalding/src/main/scala/sandcrawler/Scorable.scala | 2 +- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 65 +++++++++++++++++++++- 5 files changed, 90 insertions(+), 8 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 9842122..146feec 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,6 +10,26 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource import TDsl._ +import java.text.Normalizer +import java.util.Arrays +import java.util.Properties +import java.util.regex.Pattern + +import scala.math +import scala.util.parsing.json.JSON + +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.CoGrouped +import com.twitter.scalding.typed.Grouped +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + class CrossrefScorable extends Scorable with HBasePipeConversions { // TODO: Generalize args so there can be multiple Grobid pipes in one job. def getSource(args : Args) : Source = { @@ -17,6 +37,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { } def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + // Here I CANNOT call Pipe.toTypedPipe() pipe .toTypedPipe[String](new Fields("line")) .map{ json : String => diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 51e40f9..ba15f22 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -8,7 +8,7 @@ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -import TDsl._ +//import TDsl._ class GrobidScorable extends Scorable with HBasePipeConversions { def getSource(args : Args) : Source = { diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 725474d..018a74b 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -19,6 +19,7 @@ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource +import TDsl._ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable @@ -30,13 +31,13 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val temp : cascading.pipe.Pipe = grobidSource .read - .fromBytesWritable(new Fields("key", "tei_json")) + + // Here I CAN call Pipe.toTypedPipe() val grobidPipe : TypedPipe[(String, String, String)] = temp - // .debug // Should be 4 tuples for mocked data + .fromBytesWritable(new Fields("key", "tei_json")) .toTypedPipe[(String, String)]('key, 'tei_json) .map { entry => val (key, json) = (entry._1, entry._2) - // TODO: Consider passing forward only a subset of JSON. HBaseCrossrefScore.grobidToSlug(json) match { case Some(slug) => (slug, key, json) case None => (NoTitle, key, json) @@ -46,7 +47,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val (slug, _, _) = entry slug != NoTitle } -// .debug // SHould be 3 tuples for mocked data val grobidGroup = grobidPipe .groupBy { case (slug, key, json) => slug } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index bd03d57..65d9b41 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -6,7 +6,7 @@ import scala.util.parsing.json.JSON import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -import TDsl._ +//import TDsl._ case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 7891596..0dbe64d 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,13 +1,50 @@ package sandcrawler import cascading.flow.FlowDef +import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource -class ScoreJob(args: Args) extends JobBase(args) with - HBasePipeConversions { +//case class MapFeatures(slug : String, json : String) + +class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { + + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) + + val source0 : Source = TextLine("foo") + val pipe0 : cascading.pipe.Pipe = source0.read + // This compiles: + val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0) + + // Calling a method within ScoreJob compiles fine. + def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { + pipe + // This compiles: + .toTypedPipe[String](new Fields("line")) + } + + // Calling a function in a ScoreJob object leads to a compiler error. + val source1 : Source = TextLine("foo") + val pipe1 : cascading.pipe.Pipe = source1.read + // This leads to a compile error: + val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0) + + /* + val pipe : cascading.pipe.Pipe = grobidSource + .read + val grobidPipe : TypedPipe[(String, String)] = pipe + .fromBytesWritable(new Fields("key", "tei_json")) + // Here I CAN call Pipe.toTypedPipe() + .toTypedPipe[(String, String)]('key, 'tei_json) + .write(TypedTsv[(String, String)](args("output"))) + + // Let's try making a method call. +// ScoreJob.etFeaturesPipe(pipe) // TODO: Instantiate any subclass of Scorable specified in args. Scorable sc1 = new GrobidScorable() @@ -15,6 +52,7 @@ class ScoreJob(args: Args) extends JobBase(args) with val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) + pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry new ReduceOutput( @@ -24,6 +62,8 @@ class ScoreJob(args: Args) extends JobBase(args) with features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) + */ + } // Ugly hack to get non-String information into ScoreJob above. @@ -52,4 +92,25 @@ object ScoreJob { case None => null } } + + def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { + pipe + // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe + .toTypedPipe[String](new Fields("line")) + } +/* + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { + pipe + .fromBytesWritable(new Fields("key", "tei_json")) + // I needed to change symbols to strings when I pulled this out of ScoreJob. + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .map { entry => + val (key : String, json : String) = (entry._1, entry._2) + GrobidScorable.grobidToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } + } + } + */ } -- cgit v1.2.3 From b7f77f6337b450406ae0a90b81faeba27394afb0 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Fri, 10 Aug 2018 19:59:40 -0700 Subject: It compiles --- .../main/scala/sandcrawler/CrossrefScorable.scala | 5 +- .../main/scala/sandcrawler/GrobidScorable.scala | 7 +-- scalding/src/main/scala/sandcrawler/Scorable.scala | 6 +-- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 56 +++++++++++++--------- 4 files changed, 43 insertions(+), 31 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 146feec..817bee5 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -36,9 +36,8 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { TextLine(args("crossref-input")) } - def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { - // Here I CANNOT call Pipe.toTypedPipe() - pipe + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + getSource(args).read .toTypedPipe[String](new Fields("line")) .map{ json : String => CrossrefScorable.crossrefToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index ba15f22..61055f2 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -16,10 +16,11 @@ class GrobidScorable extends Scorable with HBasePipeConversions { GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) } - def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { - pipe + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + getSource(args) + .read .fromBytesWritable(new Fields("key", "tei_json")) - .toTypedPipe[(String, String)](new Fields('key, 'tei_json)) + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) .map { entry => val (key : String, json : String) = (entry._1, entry._2) GrobidScorable.grobidToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 65d9b41..0ec8e46 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -13,9 +13,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(pipe : cascading.pipe.Pipe) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(pipe) + getFeaturesPipe(args) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => @@ -26,7 +26,7 @@ abstract class Scorable { // abstract methods def getSource(args : Args) : Source - def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 0dbe64d..bc5bf87 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -2,16 +2,32 @@ package sandcrawler import cascading.flow.FlowDef import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.TDsl._ +import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv} +//import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource - -//case class MapFeatures(slug : String, json : String) +import com.twitter.scalding.{ Dsl, RichPipe, IterableSource, TupleSetter, TupleConverter } +import cascading.pipe.Pipe class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { + // TODO: Instantiate any subclass of Scorable specified in args. + val sc1 : Scorable = new GrobidScorable() + val sc2 : Scorable = new GrobidScorable() + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) + + pipe1.join(pipe2).map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), + features1.json, + features2.json) + } + .write(TypedTsv[ReduceOutput](args("output"))) + /* val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), args("zookeeper-hosts")) @@ -34,7 +50,6 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { // This leads to a compile error: val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0) - /* val pipe : cascading.pipe.Pipe = grobidSource .read val grobidPipe : TypedPipe[(String, String)] = pipe @@ -46,22 +61,6 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { // Let's try making a method call. // ScoreJob.etFeaturesPipe(pipe) - // TODO: Instantiate any subclass of Scorable specified in args. - Scorable sc1 = new GrobidScorable() - Scorable sc2 = new CrossrefScorable() - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) - - - pipe1.join(pipe2).map { entry => - val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry - new ReduceOutput( - slug, - Scorable.computeSimilarity(features1, features2), - features1.json, - features2.json) - } - .write(TypedTsv[ReduceOutput](args("output"))) */ } @@ -93,12 +92,25 @@ object ScoreJob { } } + /* + implicit def sourceToRichPipe(src: Source): RichPipe = new RichPipe(src.read) + + // This converts an Iterable into a Pipe or RichPipe with index (int-based) fields + implicit def toPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): Pipe = + IterableSource[T](iter)(set, conv).read + + implicit def iterableToRichPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): RichPipe = + RichPipe(toPipe(iter)(set, conv)) + + // Provide args as an implicit val for extensions such as the Checkpoint extension. +// implicit protected def _implicitJobArgs: Args = args + def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { pipe // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe .toTypedPipe[String](new Fields("line")) } -/* + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { pipe .fromBytesWritable(new Fields("key", "tei_json")) -- cgit v1.2.3 From 728e50a33cec921c9a624439f2e1c8561a6e12ce Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Sat, 11 Aug 2018 21:03:53 -0700 Subject: It compiles. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 54 ++++++++++++++-------- .../main/scala/sandcrawler/GrobidScorable.scala | 21 ++++----- scalding/src/main/scala/sandcrawler/Scorable.scala | 40 +++++++++++----- .../scala/sandcrawler/CrossrefScorableTest.scala | 26 ++++++----- .../scala/sandcrawler/GrobidScorableTest.scala | 19 ++++---- 5 files changed, 96 insertions(+), 64 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index b2f6537..5113b0c 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -18,6 +18,7 @@ import java.util.regex.Pattern import scala.math import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject import cascading.tuple.Fields import com.twitter.scalding._ @@ -40,33 +41,48 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { getSource(args).read .toTypedPipe[String](new Fields("line")) - .map{ json : String => - CrossrefScorable.simplifyJson(json) match { - case None => new MapFeatures(Scorable.NoSlug, json) - case Some(map) => new MapFeatures( - Scorable.titleToSlug(map("title").asInstanceOf[String]), - JSONObject(map).toString) + .map{ json : String => + Scorable.jsonToMap(json) match { + case None => MapFeatures(Scorable.NoSlug, json) + case Some(map) => { + if ((map contains "title") && (map contains "DOI")) { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty) { + new MapFeatures(Scorable.NoSlug, json) + } else { + val title = titles(0) + val map2 = Scorable.toScorableMap(title=titles(0), doi=map("DOI").asInstanceOf[String]) + new MapFeatures( + Scorable.mapToSlug(map2), + JSONObject(map2).toString) + } + } else { + new MapFeatures(Scorable.NoSlug, json) + } + } } } } +} - object CrossrefScorable { - def simplifyJson(json : String) : Option[Map[String, Any]] = { - Scorable.jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - val titles = map("title").asInstanceOf[List[String]] - if (titles.isEmpty) { - None - } else { - Some(Map("title" -> titles(0))) - } - } else { +/* +object CrossrefScorable { + def simplifyJson(json : String) : Option[Map[String, Any]] = { + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty) { None + } else { + Some(Map("title" -> titles(0))) } + } else { + None } } } } } + */ diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 61055f2..de9f51a 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -1,5 +1,6 @@ package sandcrawler +import scala.util.parsing.json.JSONObject import cascading.flow.FlowDef import cascading.pipe.Pipe import cascading.tuple.Fields @@ -21,13 +22,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions { .read .fromBytesWritable(new Fields("key", "tei_json")) .toTypedPipe[(String, String)](new Fields("key", "tei_json")) - .map { entry => - val (key : String, json : String) = (entry._1, entry._2) - GrobidScorable.grobidToSlug(json) match { - case Some(slug) => new MapFeatures(slug, json) - case None => new MapFeatures(Scorable.NoSlug, json) - } - } + .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } } } @@ -36,14 +31,18 @@ object GrobidScorable { HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL) } - def grobidToSlug(json : String) : Option[String] = { + def jsonToMapFeatures(key : String, json : String) : MapFeatures = { Scorable.jsonToMap(json) match { - case None => None + case None => MapFeatures(Scorable.NoSlug, json) case Some(map) => { if (map contains "title") { - Some(Scorable.titleToSlug(map("title").asInstanceOf[String])) + val map2 = Scorable.toScorableMap(Scorable.getString(map, "title"), + sha1=key) + new MapFeatures( + Scorable.mapToSlug(map2), + JSONObject(map2).toString) } else { - None + MapFeatures(Scorable.NoSlug, json) } } } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 0ec8e46..9c8da69 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -2,6 +2,7 @@ package sandcrawler import scala.math import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject import cascading.flow.FlowDef import com.twitter.scalding._ @@ -36,6 +37,21 @@ object Scorable { slug != NoSlug } + // NOTE: I could go all out and make ScorableMap a type. + // TODO: Require year. Other features will get added here. + def toScorableMap(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : Map[String, Any] = { + Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) + } + + def toScorableJson(title : String, year : Int, doi : String = "", sha1 : String = "") : String = { + JSONObject(toScorableMap(title=title, year=year, doi=doi, sha1=sha1)).toString + } + + // TODO: Score on more fields than "title". + def isScorableMap(map : Map[String, Any]) : Boolean = { + map.contains("title") + } + def jsonToMap(json : String) : Option[Map[String, Any]] = { // https://stackoverflow.com/a/32717262/631051 val jsonObject = JSON.parseFull(json) @@ -46,18 +62,17 @@ object Scorable { } } - def titleToSlug(title : String) : String = { - if (title == null || title.isEmpty) { + // Map should have been produced by toScorableMap. + // This guarantees it will have all of the fields needed to compute + // the ultimate score, which are a superset of those needed for a slug. + def mapToSlug(map : Map[String, Any]) : String = { + val unaccented = StringUtilities.removeAccents(getString(map, "title")) + // Remove punctuation after splitting on colon. + val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())) + if (slug.isEmpty || slug == null) { NoSlug } else { - val unaccented = StringUtilities.removeAccents(title) - // Remove punctuation after splitting on colon. - val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())) - if (slug.isEmpty || slug == null) { - NoSlug - } else { - slug - } + slug } } @@ -68,8 +83,9 @@ object Scorable { } } - // Caller is responsible for ensuring that key is in map. - def getString(map : Map[String, String], key : String) : String = { + // Caller is responsible for ensuring that key is a String in map. + // TODO: Add and handle ClassCastException + def getString(map : Map[String, Any], key : String) : String = { assert(map contains key) map(key).asInstanceOf[String] } diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala index 67a8bfe..1c35d66 100644 --- a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -66,20 +66,24 @@ class CrossrefScorableTest extends FlatSpec with Matchers { val MalformedCrossrefString = CrossrefString.replace("}", "") // Unit tests -/* - "crossrefToSlug()" should "get the right slug for a crossref json string" in { - val slug = CrossrefScorable.crossrefToSlug(CrossrefStringWithTitle) - slug should contain ("sometitle") + "simplifyJson()" should "return None for bad JSON" in { + CrossrefScorable.simplifyJson("") shouldBe None + CrossrefScorable.simplifyJson(MalformedCrossrefString) shouldBe None } - it should "return None if given json string without title" in { - val slug = CrossrefScorable.crossrefToSlug(CrossrefStringWithoutTitle) - slug shouldBe None + it should "return None for JSON lacking title" in { + CrossrefScorable.simplifyJson(CrossrefStringWithoutTitle) shouldBe None } - it should "return None if given a malformed json string" in { - val slug = CrossrefScorable.crossrefToSlug(MalformedCrossrefString) - slug shouldBe None + it should "return appropriate result for valid JSON" in { + CrossrefScorable.simplifyJson(CrossrefStringWithTitle) match { + case None => fail("None unexpectedly returned by simplifyJson") + case Some(map) => { + Scorable.isScorableMap(map) shouldBe true + map.size shouldBe 1 + map.keys should contain ("title") + map("title") shouldBe "SomeTitle" + } + } } - */ } diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala index 7777610..5bb955a 100644 --- a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala @@ -60,18 +60,15 @@ class GrobidScorableTest extends FlatSpec with Matchers { // Unit tests - "grobidToSlug()" should "get the right slug for a grobid json string" in { - val slug = GrobidScorable.grobidToSlug(GrobidStringWithTitle) - slug should contain ("dummy example file") + "GrobidScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + val result = GrobidScorable.jsonToMapFeatures(MalformedGrobidString) shouldBe None + result.slug shouldBe Scorable.NoSlug + result.json shouldBe MalformedGrobidString } - it should "return None if given json string without title" in { - val slug = GrobidScorable.grobidToSlug(GrobidStringWithoutTitle) - slug shouldBe None - } - - it should "return None if given a malformed json string" in { - val slug = GrobidScorable.grobidToSlug(MalformedGrobidString) - slug shouldBe None + "GrobidScorable.jsonToMapFeatures()" should "handle missing title" in { + val result = GrobidScorable.jsonToMapFeatures(GrobidStringWithoutTitle) shouldBe None + result.slug shouldBe Scorable.NoSlug + result.json shouldBe GrobidStringWithoutTitle } } -- cgit v1.2.3 From 31354b1a6062c5c56a30610f68fa48c82a7e83f0 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Sun, 12 Aug 2018 18:08:51 -0700 Subject: Tests pass. --- scalding/src/main/scala/sandcrawler/Scorable.scala | 11 +-- .../scala/sandcrawler/CrossrefScorableTest.scala | 89 ---------------------- .../scala/sandcrawler/GrobidScorableTest.scala | 20 +++-- .../src/test/scala/sandcrawler/ScorableTest.scala | 28 ++++--- 4 files changed, 39 insertions(+), 109 deletions(-) delete mode 100644 scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 9c8da69..929461b 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -66,13 +66,14 @@ object Scorable { // This guarantees it will have all of the fields needed to compute // the ultimate score, which are a superset of those needed for a slug. def mapToSlug(map : Map[String, Any]) : String = { - val unaccented = StringUtilities.removeAccents(getString(map, "title")) - // Remove punctuation after splitting on colon. - val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())) - if (slug.isEmpty || slug == null) { + val title = getString(map, "title") + if (title == null) { NoSlug } else { - slug + val unaccented = StringUtilities.removeAccents(title) + // Remove punctuation after splitting on colon. + val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())).replaceAll("\\s", "") + if (slug.isEmpty || slug == null) NoSlug else slug } } diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala deleted file mode 100644 index 1c35d66..0000000 --- a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala +++ /dev/null @@ -1,89 +0,0 @@ -package sandcrawler - -import cascading.tuple.Fields -import cascading.tuple.Tuple -import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import org.scalatest._ -import parallelai.spyglass.hbase.HBaseConstants.SourceMode - -class CrossrefScorableTest extends FlatSpec with Matchers { - val CrossrefString = -""" -{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, - "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], - "date-time" : "2017-10-23T17:19:16Z", - "timestamp" : { "$numberLong" : "1508779156477" } }, - "reference-count" : 0, - "publisher" : "Elsevier BV", - "issue" : "3", - "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", - "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], - "date-time" : "1996-01-01T00:00:00Z", - "timestamp" : { "$numberLong" : "820454400000" } }, - "delay-in-days" : 0, "content-version" : "tdm" }], - "content-domain" : { "domain" : [], "crossmark-restriction" : false }, - "published-print" : { "date-parts" : [ [ 1996 ] ] }, - "DOI" : "<<DOI>>", - "type" : "journal-article", - "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], - "date-time" : "2002-07-25T15:09:41Z", - "timestamp" : { "$numberLong" : "1027609781000" } }, - "page" : "186-187", - "source" : "Crossref", - "is-referenced-by-count" : 0, - "title" : [ "<<TITLE>>" ], - "prefix" : "10.1016", - "volume" : "9", - "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], - "member" : "78", - "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], - "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", - "content-type" : "text/xml", - "content-version" : "vor", - "intended-application" : "text-mining" }, - { "URL" : - "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", - "content-type" : "text/plain", - "content-version" : "vor", - "intended-application" : "text-mining" } ], - "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], - "date-time" : "2015-09-03T10:03:43Z", - "timestamp" : { "$numberLong" : "1441274623000" } }, - "score" : 1, - "issued" : { "date-parts" : [ [ 1996 ] ] }, - "references-count" : 0, - "alternative-id" : [ "0987-7983(96)87729-2" ], - "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", - "ISSN" : [ "0987-7983" ], - "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], - "subject" : [ "Pediatrics, Perinatology, and Child Health" ] -} -""" - val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") - val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") - val MalformedCrossrefString = CrossrefString.replace("}", "") - - // Unit tests - "simplifyJson()" should "return None for bad JSON" in { - CrossrefScorable.simplifyJson("") shouldBe None - CrossrefScorable.simplifyJson(MalformedCrossrefString) shouldBe None - } - - it should "return None for JSON lacking title" in { - CrossrefScorable.simplifyJson(CrossrefStringWithoutTitle) shouldBe None - } - - it should "return appropriate result for valid JSON" in { - CrossrefScorable.simplifyJson(CrossrefStringWithTitle) match { - case None => fail("None unexpectedly returned by simplifyJson") - case Some(map) => { - Scorable.isScorableMap(map) shouldBe true - map.size shouldBe 1 - map.keys should contain ("title") - map("title") shouldBe "SomeTitle" - } - } - } -} diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala index 5bb955a..3fcd856 100644 --- a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala @@ -57,18 +57,28 @@ class GrobidScorableTest extends FlatSpec with Matchers { val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") val MalformedGrobidString = GrobidString.replace("}", "") + val Key = "Dummy Key" // Unit tests "GrobidScorable.jsonToMapFeatures()" should "handle invalid JSON" in { - val result = GrobidScorable.jsonToMapFeatures(MalformedGrobidString) shouldBe None + val result = GrobidScorable.jsonToMapFeatures(Key, MalformedGrobidString) result.slug shouldBe Scorable.NoSlug - result.json shouldBe MalformedGrobidString } - "GrobidScorable.jsonToMapFeatures()" should "handle missing title" in { - val result = GrobidScorable.jsonToMapFeatures(GrobidStringWithoutTitle) shouldBe None + it should "handle missing title" in { + val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithoutTitle) result.slug shouldBe Scorable.NoSlug - result.json shouldBe GrobidStringWithoutTitle + } + + it should "handle valid input" in { + val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithTitle) + result.slug shouldBe "dummyexamplefile" + Scorable.jsonToMap(result.json) match { + case None => fail() + case Some(map) => { + map("title").asInstanceOf[String] shouldBe "Dummy Example File" + } + } } } diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 2f80492..95faacc 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -54,28 +54,36 @@ class ScorableTest extends FlatSpec with Matchers { "annex": null } """ + private def titleToSlug(s : String) : String = { + Scorable.mapToSlug(Scorable.toScorableMap(title = s)) + } - "titleToSlug()" should "extract the parts of titles before a colon" in { - Scorable.titleToSlug("HELLO:there") shouldBe "hello" + "mapToSlug()" should "extract the parts of titles before a colon" in { + titleToSlug("HELLO:there") shouldBe "hello" } it should "extract an entire colon-less string" in { - Scorable.titleToSlug("hello THERE") shouldBe "hello there" + titleToSlug("hello THERE") shouldBe "hellothere" } it should "return Scorable.NoSlug if given empty string" in { - Scorable.titleToSlug("") shouldBe Scorable.NoSlug + titleToSlug("") shouldBe Scorable.NoSlug } it should "return Scorable.NoSlug if given null" in { - Scorable.titleToSlug(null) shouldBe Scorable.NoSlug + titleToSlug(null) shouldBe Scorable.NoSlug + } + + it should "strip punctuation" in { + titleToSlug("HELLO!:the:re") shouldBe "hello" + titleToSlug("a:b:c") shouldBe "a" + titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe "ifyourehappyandyouknowitclapyourhands" } - "titleToSlug()" should "strip punctuation" in { - Scorable.titleToSlug("HELLO!:the:re") shouldBe "hello" - Scorable.titleToSlug("a:b:c") shouldBe "a" - Scorable.titleToSlug( - "If you're happy and you know it, clap your hands!") shouldBe "if youre happy and you know it clap your hands" + it should "remove whitespace" in { + titleToSlug("foo bar : baz ::") shouldBe "foobar" + titleToSlug("\na\t:b:c") shouldBe "a" } "jsonToMap()" should "return a map, given a legal JSON string" in { -- cgit v1.2.3 From 5615428921a45ba6a2fb005b255a28dcbb83b13f Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Sun, 12 Aug 2018 19:12:32 -0700 Subject: Snapshot before changing Scorable to find bug. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 41 ++++++++++++---------- scalding/src/main/scala/sandcrawler/Scorable.scala | 1 - .../scala/sandcrawler/CrossrefScorableTest.scala | 24 ++++++------- .../scala/sandcrawler/GrobidScorableTest.scala | 1 + .../src/test/scala/sandcrawler/ScoreJobTest.scala | 15 +++++--- 5 files changed, 46 insertions(+), 36 deletions(-) (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 667a5cc..e257152 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -41,26 +41,31 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { getSource(args).read .toTypedPipe[String](new Fields("line")) - .map{ json : String => - Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) - case Some(map) => { - if ((map contains "title") && (map contains "DOI")) { - val titles = map("title").asInstanceOf[List[String]] - if (titles.isEmpty) { - new MapFeatures(Scorable.NoSlug, json) - } else { - val title = titles(0) - val map2 = Scorable.toScorableMap(title=titles(0), doi=map("DOI").asInstanceOf[String]) - new MapFeatures( - Scorable.mapToSlug(map2), - JSONObject(map2).toString) - } - } else { - new MapFeatures(Scorable.NoSlug, json) - } + .map { CrossrefScorable.jsonToMapFeatures(_) } + } +} + +object CrossrefScorable { + def jsonToMapFeatures(json : String) : MapFeatures = { + Scorable.jsonToMap(json) match { + case None => MapFeatures(Scorable.NoSlug, json) + case Some(map) => { + if ((map contains "titles") && (map contains "DOI")) { + val titles = map("titles").asInstanceOf[List[String]] + val doi = Scorable.getString(map, "DOI") + if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) { + new MapFeatures(Scorable.NoSlug, json) + } else { + val title = titles(0) + val map2 = Scorable.toScorableMap(title=title, doi=doi) + new MapFeatures( + Scorable.mapToSlug(map2), + JSONObject(map2).toString) } + } else { + new MapFeatures(Scorable.NoSlug, json) } } + } } } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 929461b..a256fa4 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -7,7 +7,6 @@ import scala.util.parsing.json.JSONObject import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -//import TDsl._ case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala index 1c35d66..dc6f347 100644 --- a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -66,23 +66,23 @@ class CrossrefScorableTest extends FlatSpec with Matchers { val MalformedCrossrefString = CrossrefString.replace("}", "") // Unit tests - "simplifyJson()" should "return None for bad JSON" in { - CrossrefScorable.simplifyJson("") shouldBe None - CrossrefScorable.simplifyJson(MalformedCrossrefString) shouldBe None + "CrossrefScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + val result = CrossrefScorable.jsonToMapFeatures(MalformedCrossrefString) + result.slug shouldBe Scorable.NoSlug } - it should "return None for JSON lacking title" in { - CrossrefScorable.simplifyJson(CrossrefStringWithoutTitle) shouldBe None + it should "handle missing title" in { + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithoutTitle) + result.slug shouldBe Scorable.NoSlug } - it should "return appropriate result for valid JSON" in { - CrossrefScorable.simplifyJson(CrossrefStringWithTitle) match { - case None => fail("None unexpectedly returned by simplifyJson") + it should "handle valid input" in { + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithTitle) + result.slug shouldBe "dummyexamplefile" + Scorable.jsonToMap(result.json) match { + case None => fail() case Some(map) => { - Scorable.isScorableMap(map) shouldBe true - map.size shouldBe 1 - map.keys should contain ("title") - map("title") shouldBe "SomeTitle" + map("title").asInstanceOf[String] shouldBe "Dummy Example File" } } } diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala index 3fcd856..4b958b9 100644 --- a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala @@ -77,6 +77,7 @@ class GrobidScorableTest extends FlatSpec with Matchers { Scorable.jsonToMap(result.json) match { case None => fail() case Some(map) => { + map should contain key "title" map("title").asInstanceOf[String] shouldBe "Dummy Example File" } } diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala index 8acb454..8436817 100644 --- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -149,11 +149,16 @@ class ScoreJobTest extends FlatSpec with Matchers { 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) { - // Grobid titles: - // "Title 1", "Title 2: TNG", "Title 3: The Sequel" - // crossref slugs: - // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" - // Join should have 3 "Title 1" slugs and 1 "Title 2" slug + // Grobid titles and slugs (in parentheses): + // Title 1 (title1) + // Title 2: TNG (title2) + // Title 3: The Sequel (title3) + // crossref titles and slugs (in parentheses): + // Title 1: TNG (title1) + // Title 1: TNG 2 (title1) + // Title 1: TNG 3 (title1) + // Title 2 Rebooted (title2rebooted) + // Join should have 3 "title1" slugs and 1 "title2" slug outputBuffer => "The pipeline" should "return a 4-element list" in { outputBuffer should have length 4 -- cgit v1.2.3 From 1c6e1234974d8b6e4480a13ff5c4ff861c6d1deb Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Mon, 13 Aug 2018 09:58:27 -0700 Subject: Pipeline works, all tests pass, no scalastyle errors. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 28 +-- .../main/scala/sandcrawler/GrobidScorable.scala | 3 +- .../scala/sandcrawler/HBaseCrossrefScoreJob.scala | 218 --------------------- scalding/src/main/scala/sandcrawler/Scorable.scala | 2 +- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 51 +---- .../scala/sandcrawler/CrossrefScorableTest.scala | 6 +- .../src/test/scala/sandcrawler/ScoreJobTest.scala | 80 +++++--- 7 files changed, 65 insertions(+), 323 deletions(-) delete mode 100644 scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index e257152..4558ee6 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -1,36 +1,14 @@ package sandcrawler -import cascading.flow.FlowDef -import cascading.pipe.Pipe -import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.TDsl._ -import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource -import TDsl._ -import scala.util.parsing.json.JSONObject - -import java.text.Normalizer -import java.util.Arrays -import java.util.Properties -import java.util.regex.Pattern - import scala.math import scala.util.parsing.json.JSON import scala.util.parsing.json.JSONObject +import cascading.flow.FlowDef import cascading.tuple.Fields import com.twitter.scalding._ -import com.twitter.scalding.typed.CoGrouped -import com.twitter.scalding.typed.Grouped import com.twitter.scalding.typed.TDsl._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable with HBasePipeConversions { // TODO: Generalize args so there can be multiple Crossref pipes in one job. @@ -50,8 +28,8 @@ object CrossrefScorable { Scorable.jsonToMap(json) match { case None => MapFeatures(Scorable.NoSlug, json) case Some(map) => { - if ((map contains "titles") && (map contains "DOI")) { - val titles = map("titles").asInstanceOf[List[String]] + if ((map contains "title") && (map contains "DOI")) { + val titles = map("title").asInstanceOf[List[String]] val doi = Scorable.getString(map, "DOI") if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) { new MapFeatures(Scorable.NoSlug, json) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index de9f51a..94b3494 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -1,15 +1,14 @@ package sandcrawler import scala.util.parsing.json.JSONObject + import cascading.flow.FlowDef -import cascading.pipe.Pipe import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -//import TDsl._ class GrobidScorable extends Scorable with HBasePipeConversions { def getSource(args : Args) : Source = { diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala deleted file mode 100644 index 018a74b..0000000 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ /dev/null @@ -1,218 +0,0 @@ -package sandcrawler - -import java.text.Normalizer -import java.util.Arrays -import java.util.Properties -import java.util.regex.Pattern - -import scala.math -import scala.util.parsing.json.JSON - -import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.CoGrouped -import com.twitter.scalding.typed.Grouped -import com.twitter.scalding.typed.TDsl._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource -import TDsl._ - -class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable - - // key is SHA1 - val grobidSource = HBaseCrossrefScore.getHBaseSource( - args("hbase-table"), - args("zookeeper-hosts")) - - val temp : cascading.pipe.Pipe = grobidSource - .read - - // Here I CAN call Pipe.toTypedPipe() - val grobidPipe : TypedPipe[(String, String, String)] = temp - .fromBytesWritable(new Fields("key", "tei_json")) - .toTypedPipe[(String, String)]('key, 'tei_json) - .map { entry => - val (key, json) = (entry._1, entry._2) - HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => (slug, key, json) - case None => (NoTitle, key, json) - } - } - .filter { entry => - val (slug, _, _) = entry - slug != NoTitle - } - - val grobidGroup = grobidPipe - .groupBy { case (slug, key, json) => slug } - - val crossrefSource = TextLine(args("crossref-input")) - val temp2 : cascading.pipe.Pipe = crossrefSource.read - val crossrefPipe : TypedPipe[(String, String)] = temp2 - // .debug // Should be 4 tuples for mocked data - .toTypedPipe[String]('line) - .map{ json : String => - HBaseCrossrefScore.crossrefToSlug(json) match { - case Some(slug) => (slug, json) - case None => (NoTitle, json) - } - } - .filter { entry => - val (slug, json) = entry - slug != NoTitle - } - - val crossrefGroup = crossrefPipe - .groupBy { case (slug, json) => slug } - - val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = - grobidGroup.join(crossrefGroup) - - theJoin.map{ entry => - val (slug : String, - ((slug0: String, sha1 : String, grobidJson : String), - (slug1 : String, crossrefJson : String))) = entry - HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} - // Output: score, sha1, doi, grobid title, crossref title - .write(TypedTsv[(Int, String, String, String, String)](args("output"))) - -} - -object HBaseCrossrefScore { - def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( - hbaseTable, // HBase Table Name - zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?) - List("grobid0:tei_json"), - SourceMode.SCAN_ALL) - - def jsonToMap(json : String) : Option[Map[String, Any]] = { - // https://stackoverflow.com/a/32717262/631051 - val jsonObject = JSON.parseFull(json) - if (jsonObject == None) { - None - } else { - Some(jsonObject.get.asInstanceOf[Map[String, Any]]) - } - } - - def grobidToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - titleToSlug(map("title").asInstanceOf[String]) - } else { - None - } - } - } - } - - def crossrefToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - // TODO: Don't ignore titles after the first. - titleToSlug(map("title").asInstanceOf[List[String]](0)) - } else { - None - } - } - } - } - - def titleToSlug(title : String) : Option[String] = { - val slug = removeAccents(title).split(":")(0).toLowerCase() - if (slug.isEmpty) { - None - } else { - Some(slug) - } - } - - val MaxScore = 1000 - - def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : - // (score, sha1, doi, grobidTitle, crossrefTitle) - (Int, String, String, String, String) = { - jsonToMap(grobidJson) match { - case None => (0, "", "", "", "") // This can't happen, because grobidJson already validated in earlier stage - case Some(grobid) => { - val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() - - jsonToMap(crossrefJson) match { - case None => (0, "", "", "", "") // This can't happen, because crossrefJson already validated in earlier stage - case Some(crossref) => { - val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() - - (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)), - sha1, - crossref("DOI").asInstanceOf[String], - "'" + grobidTitle + "'", - "'" + crossrefTitle + "'") - } - } - } - } - } - - // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 - def removeAccents(s : String) : String = { - val replacements = Map( - '\u0141' -> 'L', - '\u0142' -> 'l', // Letter ell - '\u00d8' -> 'O', - '\u00f8' -> 'o' - ) - val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) - for (i <- 0 to sb.length - 1) { - for (key <- replacements.keys) { - if (sb(i) == key) { - sb.deleteCharAt(i); - sb.insert(i, replacements(key)) - } - } - } - val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") - pattern.matcher(sb).replaceAll("") - } - - // Adapted from: https://stackoverflow.com/a/16018452/631051 - def similarity(s1 : String, s2 : String) : Int = { - val longer : String = if (s1.length > s2.length) s1 else s2 - val shorter : String = if (s1.length > s2.length) s2 else s1 - if (longer.length == 0) { - // Both strings are empty. - MaxScore - } else { - (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length - } - } - - // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ - def stringDistance(s1: String, s2: String): Int = { - val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() - def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) - def sd(s1: List[Char], s2: List[Char]): Int = { - if (!memo.contains((s1, s2))) { - memo((s1,s2)) = (s1, s2) match { - case (_, Nil) => s1.length - case (Nil, _) => s2.length - case (c1::t1, c2::t2) => - min( sd(t1,s2) + 1, sd(s1,t2) + 1, - sd(t1,t2) + (if (c1==c2) 0 else 1) ) - } - } - memo((s1,s2)) - } - - sd( s1.toList, s2.toList ) - } -} - diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index a256fa4..717b2d5 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -39,7 +39,7 @@ object Scorable { // NOTE: I could go all out and make ScorableMap a type. // TODO: Require year. Other features will get added here. def toScorableMap(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : Map[String, Any] = { - Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) + Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) } def toScorableJson(title : String, year : Int, doi : String = "", sha1 : String = "") : String = { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 386b367..75d45e9 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,16 +1,12 @@ package sandcrawler -import cascading.flow.FlowDef -import cascading.tuple.Fields -import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv} -//import com.twitter.scalding.source.TypedText -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource -import com.twitter.scalding.{ Dsl, RichPipe, IterableSource, TupleSetter, TupleConverter } import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase -class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { +class ScoreJob(args: Args) extends JobBase(args) { // TODO: Instantiate any subclass of Scorable specified in args. val sc1 : Scorable = new GrobidScorable() val sc2 : Scorable = new CrossrefScorable() @@ -27,10 +23,10 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { } //TypedTsv doesn't work over case classes. .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } - .write(TypedTsv[(String, Int, String, String)](args("output"))) } +/* // Ugly hack to get non-String information into ScoreJob above. object ScoreJob { var scorable1 : Option[Scorable] = None @@ -57,38 +53,5 @@ object ScoreJob { case None => null } } - - /* - implicit def sourceToRichPipe(src: Source): RichPipe = new RichPipe(src.read) - - // This converts an Iterable into a Pipe or RichPipe with index (int-based) fields - implicit def toPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): Pipe = - IterableSource[T](iter)(set, conv).read - - implicit def iterableToRichPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): RichPipe = - RichPipe(toPipe(iter)(set, conv)) - - // Provide args as an implicit val for extensions such as the Checkpoint extension. -// implicit protected def _implicitJobArgs: Args = args - - def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { - pipe - // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe - .toTypedPipe[String](new Fields("line")) - } - - def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { - pipe - .fromBytesWritable(new Fields("key", "tei_json")) - // I needed to change symbols to strings when I pulled this out of ScoreJob. - .toTypedPipe[(String, String)](new Fields("key", "tei_json")) - .map { entry => - val (key : String, json : String) = (entry._1, entry._2) - GrobidScorable.grobidToSlug(json) match { - case Some(slug) => new MapFeatures(slug, json) - case None => new MapFeatures(Scorable.NoSlug, json) - } - } - } - */ } + */ diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala index dc6f347..75be03e 100644 --- a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -61,7 +61,7 @@ class CrossrefScorableTest extends FlatSpec with Matchers { "subject" : [ "Pediatrics, Perinatology, and Child Health" ] } """ - val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "Some Title") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") @@ -78,11 +78,11 @@ class CrossrefScorableTest extends FlatSpec with Matchers { it should "handle valid input" in { val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithTitle) - result.slug shouldBe "dummyexamplefile" + result.slug shouldBe "sometitle" Scorable.jsonToMap(result.json) match { case None => fail() case Some(map) => { - map("title").asInstanceOf[String] shouldBe "Dummy Example File" + map("title").asInstanceOf[String] shouldBe "Some Title" } } } diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala index 8436817..f0b411f 100644 --- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -113,25 +113,32 @@ class ScoreJobTest extends FlatSpec with Matchers { val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStrings = List( + CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")) // Pipeline tests val output = "/tmp/testOutput" val input = "/tmp/testInput" val (testTable, testHost) = ("test-table", "dummy-host:2181") - val grobidSampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), - Bytes.toBytes(MalformedGrobidString))) + val Sha1Strings = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56") - // TODO: Make less yucky. - ScoreJob.setScorable1(new CrossrefScorable()) - ScoreJob.setScorable2(new GrobidScorable()) + val GrobidStrings = List( + GrobidString.replace("<<TITLE>>", "Title 1"), + GrobidString.replace("<<TITLE>>", "Title 2: TNG"), + GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"), + MalformedGrobidString) + + val GrobidSampleData = (Sha1Strings zip GrobidStrings) + .map{case(s, g) => + List(Bytes.toBytes(s), Bytes.toBytes(g))} JobTest("sandcrawler.ScoreJob") .arg("test", "") @@ -142,12 +149,12 @@ class ScoreJobTest extends FlatSpec with Matchers { .arg("crossref-input", input) .arg("debug", "true") .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), - grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + GrobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) .source(TextLine(input), List( - 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), - 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), - 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) + 0 -> CrossrefStrings(0), + 1 -> CrossrefStrings(1), + 2 -> CrossrefStrings(2), + 3 -> CrossrefStrings(3))) .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) { // Grobid titles and slugs (in parentheses): // Title 1 (title1) @@ -155,27 +162,40 @@ class ScoreJobTest extends FlatSpec with Matchers { // Title 3: The Sequel (title3) // crossref titles and slugs (in parentheses): // Title 1: TNG (title1) - // Title 1: TNG 2 (title1) + // Title 1: TNG 2A (title1) // Title 1: TNG 3 (title1) - // Title 2 Rebooted (title2rebooted) + // Title 2: Rebooted (title2) // Join should have 3 "title1" slugs and 1 "title2" slug outputBuffer => "The pipeline" should "return a 4-element list" in { outputBuffer should have length 4 } - /* - it should "return the right first entry" in { - outputBuffer(0) shouldBe ReduceOutput("slug", 50, "", - "") - val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) - slug shouldBe "title 1" - slug shouldBe slug0 - slug shouldBe slug1 - sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") - grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") + it should "has right # of entries with each slug" in { + val slugs = outputBuffer.map(_._1) + val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) + countMap("title1") shouldBe 3 + countMap("title2") shouldBe 1 + } + + def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) = { + val mf1 : MapFeatures = GrobidScorable.jsonToMapFeatures( + Sha1Strings(grobidIndex), + GrobidStrings(grobidIndex)) + val mf2 : MapFeatures = CrossrefScorable.jsonToMapFeatures( + CrossrefStrings(crossrefIndex)) + val score = Scorable.computeSimilarity( + ReduceFeatures(mf1.json), + ReduceFeatures(mf2.json)) + (slug, score, mf1.json, mf2.json) + } + + it should "have right output values" in { + outputBuffer.exists(_ == bundle("title1", 0, 0)) + outputBuffer.exists(_ == bundle("title1", 0, 2)) + outputBuffer.exists(_ == bundle("title1", 0, 1)) + outputBuffer.exists(_ == bundle("title2", 1, 3)) } - */ } .run .finish -- cgit v1.2.3 From b4f1acce5eccbb56291f82906d9c01534c7f1506 Mon Sep 17 00:00:00 2001 From: Ellen Spertus <ellen.spertus@gmail.com> Date: Mon, 13 Aug 2018 10:27:48 -0700 Subject: Factored out ScorableFeatures. --- .../main/scala/sandcrawler/CrossrefScorable.scala | 7 ++-- .../main/scala/sandcrawler/GrobidScorable.scala | 6 +--- scalding/src/main/scala/sandcrawler/Scorable.scala | 30 ------------------ .../main/scala/sandcrawler/ScorableFeatures.scala | 30 ++++++++++++++++++ .../scala/sandcrawler/ScorableFeaturesTest.scala | 37 ++++++++++++++++++++++ .../src/test/scala/sandcrawler/ScorableTest.scala | 32 ------------------- 6 files changed, 70 insertions(+), 72 deletions(-) create mode 100644 scalding/src/main/scala/sandcrawler/ScorableFeatures.scala create mode 100644 scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 4558ee6..4897b1c 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -34,11 +34,8 @@ object CrossrefScorable { if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) { new MapFeatures(Scorable.NoSlug, json) } else { - val title = titles(0) - val map2 = Scorable.toScorableMap(title=title, doi=doi) - new MapFeatures( - Scorable.mapToSlug(map2), - JSONObject(map2).toString) + val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi) + new MapFeatures(sf.toSlug, sf.toString) } } else { new MapFeatures(Scorable.NoSlug, json) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 94b3494..5ba7d58 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -35,11 +35,7 @@ object GrobidScorable { case None => MapFeatures(Scorable.NoSlug, json) case Some(map) => { if (map contains "title") { - val map2 = Scorable.toScorableMap(Scorable.getString(map, "title"), - sha1=key) - new MapFeatures( - Scorable.mapToSlug(map2), - JSONObject(map2).toString) + new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures } else { MapFeatures(Scorable.NoSlug, json) } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 717b2d5..9b9c633 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -36,21 +36,6 @@ object Scorable { slug != NoSlug } - // NOTE: I could go all out and make ScorableMap a type. - // TODO: Require year. Other features will get added here. - def toScorableMap(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : Map[String, Any] = { - Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) - } - - def toScorableJson(title : String, year : Int, doi : String = "", sha1 : String = "") : String = { - JSONObject(toScorableMap(title=title, year=year, doi=doi, sha1=sha1)).toString - } - - // TODO: Score on more fields than "title". - def isScorableMap(map : Map[String, Any]) : Boolean = { - map.contains("title") - } - def jsonToMap(json : String) : Option[Map[String, Any]] = { // https://stackoverflow.com/a/32717262/631051 val jsonObject = JSON.parseFull(json) @@ -61,21 +46,6 @@ object Scorable { } } - // Map should have been produced by toScorableMap. - // This guarantees it will have all of the fields needed to compute - // the ultimate score, which are a superset of those needed for a slug. - def mapToSlug(map : Map[String, Any]) : String = { - val title = getString(map, "title") - if (title == null) { - NoSlug - } else { - val unaccented = StringUtilities.removeAccents(title) - // Remove punctuation after splitting on colon. - val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())).replaceAll("\\s", "") - if (slug.isEmpty || slug == null) NoSlug else slug - } - } - def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = { optionalMap match { case None => None diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala new file mode 100644 index 0000000..5d6dea0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -0,0 +1,30 @@ +package sandcrawler + +import scala.util.parsing.json.JSONObject + +// Contains features needed to make slug and to score (in combination +// with a second ScorableFeatures). +class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") { + def toMap() : Map[String, Any] = { + Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) + } + + override def toString() : String = { + JSONObject(toMap()).toString + } + + def toSlug() : String = { + if (title == null) { + Scorable.NoSlug + } else { + val unaccented = StringUtilities.removeAccents(title) + // Remove punctuation after splitting on colon. + val slug = StringUtilities.removePunctuation((unaccented.split(":")(0).toLowerCase())).replaceAll("\\s", "") + if (slug.isEmpty || slug == null) Scorable.NoSlug else slug + } + } + + def toMapFeatures = { + MapFeatures(toSlug, toString) + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala new file mode 100644 index 0000000..7ec0c4d --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala @@ -0,0 +1,37 @@ +package sandcrawler + +import org.scalatest._ + +class ScorableFeaturesTest extends FlatSpec with Matchers { + private def titleToSlug(s : String) : String = { + new ScorableFeatures(title = s).toSlug + } + + "mapToSlug()" should "extract the parts of titles before a colon" in { + titleToSlug("HELLO:there") shouldBe "hello" + } + + it should "extract an entire colon-less string" in { + titleToSlug("hello THERE") shouldBe "hellothere" + } + + it should "return Scorable.NoSlug if given empty string" in { + titleToSlug("") shouldBe Scorable.NoSlug + } + + it should "return Scorable.NoSlug if given null" in { + titleToSlug(null) shouldBe Scorable.NoSlug + } + + it should "strip punctuation" in { + titleToSlug("HELLO!:the:re") shouldBe "hello" + titleToSlug("a:b:c") shouldBe "a" + titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe "ifyourehappyandyouknowitclapyourhands" + } + + it should "remove whitespace" in { + titleToSlug("foo bar : baz ::") shouldBe "foobar" + titleToSlug("\na\t:b:c") shouldBe "a" + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala index 95faacc..fd44f57 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -54,38 +54,6 @@ class ScorableTest extends FlatSpec with Matchers { "annex": null } """ - private def titleToSlug(s : String) : String = { - Scorable.mapToSlug(Scorable.toScorableMap(title = s)) - } - - "mapToSlug()" should "extract the parts of titles before a colon" in { - titleToSlug("HELLO:there") shouldBe "hello" - } - - it should "extract an entire colon-less string" in { - titleToSlug("hello THERE") shouldBe "hellothere" - } - - it should "return Scorable.NoSlug if given empty string" in { - titleToSlug("") shouldBe Scorable.NoSlug - } - - it should "return Scorable.NoSlug if given null" in { - titleToSlug(null) shouldBe Scorable.NoSlug - } - - it should "strip punctuation" in { - titleToSlug("HELLO!:the:re") shouldBe "hello" - titleToSlug("a:b:c") shouldBe "a" - titleToSlug( - "If you're happy and you know it, clap your hands!") shouldBe "ifyourehappyandyouknowitclapyourhands" - } - - it should "remove whitespace" in { - titleToSlug("foo bar : baz ::") shouldBe "foobar" - titleToSlug("\na\t:b:c") shouldBe "a" - } - "jsonToMap()" should "return a map, given a legal JSON string" in { Scorable.jsonToMap(JsonString) should not be (None) } -- cgit v1.2.3