diff options
Diffstat (limited to 'scalding/src')
7 files changed, 65 insertions, 323 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index e257152..4558ee6 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -1,36 +1,14 @@ package sandcrawler -import cascading.flow.FlowDef -import cascading.pipe.Pipe -import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.TDsl._ -import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource -import TDsl._ -import scala.util.parsing.json.JSONObject - -import java.text.Normalizer -import java.util.Arrays -import java.util.Properties -import java.util.regex.Pattern - import scala.math import scala.util.parsing.json.JSON import scala.util.parsing.json.JSONObject +import cascading.flow.FlowDef import cascading.tuple.Fields import com.twitter.scalding._ -import com.twitter.scalding.typed.CoGrouped -import com.twitter.scalding.typed.Grouped import com.twitter.scalding.typed.TDsl._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable with HBasePipeConversions { // TODO: Generalize args so there can be multiple Crossref pipes in one job. @@ -50,8 +28,8 @@ object CrossrefScorable { Scorable.jsonToMap(json) match { case None => MapFeatures(Scorable.NoSlug, json) case Some(map) => { - if ((map contains "titles") && (map contains "DOI")) { - val titles = map("titles").asInstanceOf[List[String]] + if ((map contains "title") && (map contains "DOI")) { + val titles = map("title").asInstanceOf[List[String]] val doi = Scorable.getString(map, "DOI") if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) { new MapFeatures(Scorable.NoSlug, json) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index de9f51a..94b3494 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -1,15 +1,14 @@ package sandcrawler import scala.util.parsing.json.JSONObject + import cascading.flow.FlowDef -import cascading.pipe.Pipe import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -//import TDsl._ class GrobidScorable extends Scorable with HBasePipeConversions { def getSource(args : Args) : Source = { diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala deleted file mode 100644 index 018a74b..0000000 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ /dev/null @@ -1,218 +0,0 @@ -package sandcrawler - -import java.text.Normalizer -import java.util.Arrays -import java.util.Properties -import java.util.regex.Pattern - -import scala.math -import scala.util.parsing.json.JSON - -import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.CoGrouped -import com.twitter.scalding.typed.Grouped -import com.twitter.scalding.typed.TDsl._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource -import TDsl._ - -class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable - - // key is SHA1 - val grobidSource = HBaseCrossrefScore.getHBaseSource( - args("hbase-table"), - args("zookeeper-hosts")) - - val temp : cascading.pipe.Pipe = grobidSource - .read - - // Here I CAN call Pipe.toTypedPipe() - val grobidPipe : TypedPipe[(String, String, String)] = temp - .fromBytesWritable(new Fields("key", "tei_json")) - .toTypedPipe[(String, String)]('key, 'tei_json) - .map { entry => - val (key, json) = (entry._1, entry._2) - HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => (slug, key, json) - case None => (NoTitle, key, json) - } - } - .filter { entry => - val (slug, _, _) = entry - slug != NoTitle - } - - val grobidGroup = grobidPipe - .groupBy { case (slug, key, json) => slug } - - val crossrefSource = TextLine(args("crossref-input")) - val temp2 : cascading.pipe.Pipe = crossrefSource.read - val crossrefPipe : TypedPipe[(String, String)] = temp2 - // .debug // Should be 4 tuples for mocked data - .toTypedPipe[String]('line) - .map{ json : String => - HBaseCrossrefScore.crossrefToSlug(json) match { - case Some(slug) => (slug, json) - case None => (NoTitle, json) - } - } - .filter { entry => - val (slug, json) = entry - slug != NoTitle - } - - val crossrefGroup = crossrefPipe - .groupBy { case (slug, json) => slug } - - val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = - grobidGroup.join(crossrefGroup) - - theJoin.map{ entry => - val (slug : String, - ((slug0: String, sha1 : String, grobidJson : String), - (slug1 : String, crossrefJson : String))) = entry - HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} - // Output: score, sha1, doi, grobid title, crossref title - .write(TypedTsv[(Int, String, String, String, String)](args("output"))) - -} - -object HBaseCrossrefScore { - def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( - hbaseTable, // HBase Table Name - zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?) - List("grobid0:tei_json"), - SourceMode.SCAN_ALL) - - def jsonToMap(json : String) : Option[Map[String, Any]] = { - // https://stackoverflow.com/a/32717262/631051 - val jsonObject = JSON.parseFull(json) - if (jsonObject == None) { - None - } else { - Some(jsonObject.get.asInstanceOf[Map[String, Any]]) - } - } - - def grobidToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - titleToSlug(map("title").asInstanceOf[String]) - } else { - None - } - } - } - } - - def crossrefToSlug(json : String) : Option[String] = { - jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - // TODO: Don't ignore titles after the first. - titleToSlug(map("title").asInstanceOf[List[String]](0)) - } else { - None - } - } - } - } - - def titleToSlug(title : String) : Option[String] = { - val slug = removeAccents(title).split(":")(0).toLowerCase() - if (slug.isEmpty) { - None - } else { - Some(slug) - } - } - - val MaxScore = 1000 - - def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : - // (score, sha1, doi, grobidTitle, crossrefTitle) - (Int, String, String, String, String) = { - jsonToMap(grobidJson) match { - case None => (0, "", "", "", "") // This can't happen, because grobidJson already validated in earlier stage - case Some(grobid) => { - val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() - - jsonToMap(crossrefJson) match { - case None => (0, "", "", "", "") // This can't happen, because crossrefJson already validated in earlier stage - case Some(crossref) => { - val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() - - (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)), - sha1, - crossref("DOI").asInstanceOf[String], - "'" + grobidTitle + "'", - "'" + crossrefTitle + "'") - } - } - } - } - } - - // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 - def removeAccents(s : String) : String = { - val replacements = Map( - '\u0141' -> 'L', - '\u0142' -> 'l', // Letter ell - '\u00d8' -> 'O', - '\u00f8' -> 'o' - ) - val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) - for (i <- 0 to sb.length - 1) { - for (key <- replacements.keys) { - if (sb(i) == key) { - sb.deleteCharAt(i); - sb.insert(i, replacements(key)) - } - } - } - val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") - pattern.matcher(sb).replaceAll("") - } - - // Adapted from: https://stackoverflow.com/a/16018452/631051 - def similarity(s1 : String, s2 : String) : Int = { - val longer : String = if (s1.length > s2.length) s1 else s2 - val shorter : String = if (s1.length > s2.length) s2 else s1 - if (longer.length == 0) { - // Both strings are empty. - MaxScore - } else { - (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length - } - } - - // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ - def stringDistance(s1: String, s2: String): Int = { - val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() - def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) - def sd(s1: List[Char], s2: List[Char]): Int = { - if (!memo.contains((s1, s2))) { - memo((s1,s2)) = (s1, s2) match { - case (_, Nil) => s1.length - case (Nil, _) => s2.length - case (c1::t1, c2::t2) => - min( sd(t1,s2) + 1, sd(s1,t2) + 1, - sd(t1,t2) + (if (c1==c2) 0 else 1) ) - } - } - memo((s1,s2)) - } - - sd( s1.toList, s2.toList ) - } -} - diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index a256fa4..717b2d5 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -39,7 +39,7 @@ object Scorable { // NOTE: I could go all out and make ScorableMap a type. // TODO: Require year. Other features will get added here. def toScorableMap(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : Map[String, Any] = { - Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) + Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) } def toScorableJson(title : String, year : Int, doi : String = "", sha1 : String = "") : String = { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 386b367..75d45e9 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,16 +1,12 @@ package sandcrawler -import cascading.flow.FlowDef -import cascading.tuple.Fields -import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv} -//import com.twitter.scalding.source.TypedText -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseSource -import com.twitter.scalding.{ Dsl, RichPipe, IterableSource, TupleSetter, TupleConverter } import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase -class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { +class ScoreJob(args: Args) extends JobBase(args) { // TODO: Instantiate any subclass of Scorable specified in args. val sc1 : Scorable = new GrobidScorable() val sc2 : Scorable = new CrossrefScorable() @@ -27,10 +23,10 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { } //TypedTsv doesn't work over case classes. .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } - .write(TypedTsv[(String, Int, String, String)](args("output"))) } +/* // Ugly hack to get non-String information into ScoreJob above. object ScoreJob { var scorable1 : Option[Scorable] = None @@ -57,38 +53,5 @@ object ScoreJob { case None => null } } - - /* - implicit def sourceToRichPipe(src: Source): RichPipe = new RichPipe(src.read) - - // This converts an Iterable into a Pipe or RichPipe with index (int-based) fields - implicit def toPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): Pipe = - IterableSource[T](iter)(set, conv).read - - implicit def iterableToRichPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): RichPipe = - RichPipe(toPipe(iter)(set, conv)) - - // Provide args as an implicit val for extensions such as the Checkpoint extension. -// implicit protected def _implicitJobArgs: Args = args - - def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { - pipe - // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe - .toTypedPipe[String](new Fields("line")) - } - - def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { - pipe - .fromBytesWritable(new Fields("key", "tei_json")) - // I needed to change symbols to strings when I pulled this out of ScoreJob. - .toTypedPipe[(String, String)](new Fields("key", "tei_json")) - .map { entry => - val (key : String, json : String) = (entry._1, entry._2) - GrobidScorable.grobidToSlug(json) match { - case Some(slug) => new MapFeatures(slug, json) - case None => new MapFeatures(Scorable.NoSlug, json) - } - } - } - */ } + */ diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala index dc6f347..75be03e 100644 --- a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -61,7 +61,7 @@ class CrossrefScorableTest extends FlatSpec with Matchers { "subject" : [ "Pediatrics, Perinatology, and Child Health" ] } """ - val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "Some Title") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") @@ -78,11 +78,11 @@ class CrossrefScorableTest extends FlatSpec with Matchers { it should "handle valid input" in { val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithTitle) - result.slug shouldBe "dummyexamplefile" + result.slug shouldBe "sometitle" Scorable.jsonToMap(result.json) match { case None => fail() case Some(map) => { - map("title").asInstanceOf[String] shouldBe "Dummy Example File" + map("title").asInstanceOf[String] shouldBe "Some Title" } } } diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala index 8436817..f0b411f 100644 --- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -113,25 +113,32 @@ class ScoreJobTest extends FlatSpec with Matchers { val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStrings = List( + CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")) // Pipeline tests val output = "/tmp/testOutput" val input = "/tmp/testInput" val (testTable, testHost) = ("test-table", "dummy-host:2181") - val grobidSampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), - Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), - Bytes.toBytes(MalformedGrobidString))) + val Sha1Strings = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56") - // TODO: Make less yucky. - ScoreJob.setScorable1(new CrossrefScorable()) - ScoreJob.setScorable2(new GrobidScorable()) + val GrobidStrings = List( + GrobidString.replace("<<TITLE>>", "Title 1"), + GrobidString.replace("<<TITLE>>", "Title 2: TNG"), + GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"), + MalformedGrobidString) + + val GrobidSampleData = (Sha1Strings zip GrobidStrings) + .map{case(s, g) => + List(Bytes.toBytes(s), Bytes.toBytes(g))} JobTest("sandcrawler.ScoreJob") .arg("test", "") @@ -142,12 +149,12 @@ class ScoreJobTest extends FlatSpec with Matchers { .arg("crossref-input", input) .arg("debug", "true") .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), - grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + GrobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) .source(TextLine(input), List( - 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), - 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), - 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) + 0 -> CrossrefStrings(0), + 1 -> CrossrefStrings(1), + 2 -> CrossrefStrings(2), + 3 -> CrossrefStrings(3))) .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) { // Grobid titles and slugs (in parentheses): // Title 1 (title1) @@ -155,27 +162,40 @@ class ScoreJobTest extends FlatSpec with Matchers { // Title 3: The Sequel (title3) // crossref titles and slugs (in parentheses): // Title 1: TNG (title1) - // Title 1: TNG 2 (title1) + // Title 1: TNG 2A (title1) // Title 1: TNG 3 (title1) - // Title 2 Rebooted (title2rebooted) + // Title 2: Rebooted (title2) // Join should have 3 "title1" slugs and 1 "title2" slug outputBuffer => "The pipeline" should "return a 4-element list" in { outputBuffer should have length 4 } - /* - it should "return the right first entry" in { - outputBuffer(0) shouldBe ReduceOutput("slug", 50, "", - "") - val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) - slug shouldBe "title 1" - slug shouldBe slug0 - slug shouldBe slug1 - sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") - grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") + it should "has right # of entries with each slug" in { + val slugs = outputBuffer.map(_._1) + val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) + countMap("title1") shouldBe 3 + countMap("title2") shouldBe 1 + } + + def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) = { + val mf1 : MapFeatures = GrobidScorable.jsonToMapFeatures( + Sha1Strings(grobidIndex), + GrobidStrings(grobidIndex)) + val mf2 : MapFeatures = CrossrefScorable.jsonToMapFeatures( + CrossrefStrings(crossrefIndex)) + val score = Scorable.computeSimilarity( + ReduceFeatures(mf1.json), + ReduceFeatures(mf2.json)) + (slug, score, mf1.json, mf2.json) + } + + it should "have right output values" in { + outputBuffer.exists(_ == bundle("title1", 0, 0)) + outputBuffer.exists(_ == bundle("title1", 0, 2)) + outputBuffer.exists(_ == bundle("title1", 0, 1)) + outputBuffer.exists(_ == bundle("title2", 1, 3)) } - */ } .run .finish |