aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala47
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala56
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala79
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala44
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala57
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala76
6 files changed, 359 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
new file mode 100644
index 0000000..ff8201a
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -0,0 +1,47 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+class CrossrefScorable extends Scorable with HBasePipeConversions {
+ // TODO: Generalize args so there can be multiple Crossref pipes in one job.
+ def getSource(args : Args) : Source = {
+ TextLine(args("crossref-input"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .map { CrossrefScorable.jsonToMapFeatures(_) }
+ }
+}
+
+object CrossrefScorable {
+ def jsonToMapFeatures(json : String) : MapFeatures = {
+ Scorable.jsonToMap(json) match {
+ case None => MapFeatures(Scorable.NoSlug, json)
+ case Some(map) => {
+ if ((map contains "title") && (map contains "DOI")) {
+ val titles = map("title").asInstanceOf[List[String]]
+ val doi = Scorable.getString(map, "DOI")
+ if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) {
+ new MapFeatures(Scorable.NoSlug, json)
+ } else {
+ // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ]
+ val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi)
+ new MapFeatures(sf.toSlug, sf.toString)
+ }
+ } else {
+ new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
new file mode 100644
index 0000000..9a09e05
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -0,0 +1,56 @@
+package sandcrawler
+
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class GrobidScorable extends Scorable with HBasePipeConversions {
+ val StatusOK = 200
+
+ def getSource(args : Args) : Source = {
+ // TODO: Generalize args so there can be multiple grobid pipes in one job.
+ GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ getSource(args)
+ .read
+ // Can't just "fromBytesWritable" because we have multiple types?
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code"))
+ .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null }
+ .map { case (key, tei_json, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ }
+ // TODO: Should I combine next two stages for efficiency?
+ .collect { case (key, json, StatusOK) => (key, json) }
+ .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) }
+ }
+}
+
+object GrobidScorable {
+ def getHBaseSource(table : String, host : String) : HBaseSource = {
+ HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL)
+ }
+
+ def jsonToMapFeatures(key : String, json : String) : MapFeatures = {
+ Scorable.jsonToMap(json) match {
+ case None => MapFeatures(Scorable.NoSlug, json)
+ case Some(map) => {
+ if (map contains "title") {
+ new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures
+ } else {
+ MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+ }
+ }
+}
+
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
new file mode 100644
index 0000000..9b9c633
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -0,0 +1,79 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+
+case class MapFeatures(slug : String, json : String)
+case class ReduceFeatures(json : String)
+case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
+
+abstract class Scorable {
+ def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] =
+ {
+ getFeaturesPipe(args)
+ .filter { entry => Scorable.isValidSlug(entry.slug) }
+ .groupBy { case MapFeatures(slug, json) => slug }
+ .map { tuple =>
+ val (slug : String, features : MapFeatures) = tuple
+ (slug, ReduceFeatures(features.json))
+ }
+ }
+
+ // abstract methods
+ def getSource(args : Args) : Source
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures]
+}
+
+object Scorable {
+ val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable
+
+ def isValidSlug(slug : String) : Boolean = {
+ slug != NoSlug
+ }
+
+ def jsonToMap(json : String) : Option[Map[String, Any]] = {
+ // https://stackoverflow.com/a/32717262/631051
+ val jsonObject = JSON.parseFull(json)
+ if (jsonObject == None) {
+ None
+ } else {
+ Some(jsonObject.get.asInstanceOf[Map[String, Any]])
+ }
+ }
+
+ def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = {
+ optionalMap match {
+ case None => None
+ case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None
+ }
+ }
+
+ // Caller is responsible for ensuring that key is a String in map.
+ // TODO: Add and handle ClassCastException
+ def getString(map : Map[String, Any], key : String) : String = {
+ assert(map contains key)
+ map(key).asInstanceOf[String]
+ }
+
+ val MaxScore = 1000
+
+ def computeSimilarity(features1 : ReduceFeatures, features2 : ReduceFeatures) : Int = {
+ val json1 = jsonToMap(features1.json)
+ val json2 = jsonToMap(features2.json)
+ getStringOption(json1, "title") match {
+ case None => 0
+ case Some(title1) => {
+ getStringOption(json2, "title") match {
+ case None => 0
+ case Some(title2) =>
+ (StringUtilities.similarity(title1, title2) * MaxScore).toInt
+ }
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
new file mode 100644
index 0000000..8ed3369
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -0,0 +1,44 @@
+package sandcrawler
+
+import scala.util.parsing.json.JSONObject
+
+
+// Contains features needed to make slug and to score (in combination
+// with a second ScorableFeatures).
+class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") {
+
+ val slugBlacklist = Set( "abbreviations", "abstract", "acknowledgements",
+ "article", "authorreply", "authorsreply", "bookreview", "bookreviews",
+ "casereport", "commentary", "commentaryon", "commenton", "commentto",
+ "contents", "correspondence", "dedication", "editorialadvisoryboard",
+ "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue",
+ "lettertotheeditor", "listofabbreviations", "note", "overview", "preface",
+ "references", "results", "review", "reviewarticle", "summary", "title",
+ "name")
+
+ def toMap() : Map[String, Any] = {
+ Map("title" -> (if (title == null) "" else title),
+ "year" -> year,
+ "doi" -> (if (doi == null) "" else doi),
+ "sha1" -> (if (sha1 == null) "" else sha1))
+ }
+
+ override def toString() : String = {
+ JSONObject(toMap()).toString
+ }
+
+ def toSlug() : String = {
+ if (title == null) {
+ Scorable.NoSlug
+ } else {
+ val unaccented = StringUtilities.removeAccents(title)
+ // Remove punctuation
+ val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "")
+ if (slug.isEmpty || slug == null || (slugBlacklist contains slug)) Scorable.NoSlug else slug
+ }
+ }
+
+ def toMapFeatures = {
+ MapFeatures(toSlug, toString)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
new file mode 100644
index 0000000..75d45e9
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -0,0 +1,57 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class ScoreJob(args: Args) extends JobBase(args) {
+ // TODO: Instantiate any subclass of Scorable specified in args.
+ val sc1 : Scorable = new GrobidScorable()
+ val sc2 : Scorable = new CrossrefScorable()
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
+
+ pipe1.join(pipe2).map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(features1, features2),
+ features1.json,
+ features2.json)
+ }
+ //TypedTsv doesn't work over case classes.
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}
+
+/*
+// Ugly hack to get non-String information into ScoreJob above.
+object ScoreJob {
+ var scorable1 : Option[Scorable] = None
+ var scorable2 : Option[Scorable] = None
+
+ def setScorable1(s : Scorable) {
+ scorable1 = Some(s)
+ }
+
+ def getScorable1() : Scorable = {
+ scorable1 match {
+ case Some(s) => s
+ case None => null
+ }
+ }
+
+ def setScorable2(s: Scorable) {
+ scorable2 = Some(s)
+ }
+
+ def getScorable2() : Scorable = {
+ scorable2 match {
+ case Some(s) => s
+ case None => null
+ }
+ }
+}
+ */
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
new file mode 100644
index 0000000..2745875
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -0,0 +1,76 @@
+package sandcrawler
+
+import java.text.Normalizer
+import java.util.regex.Pattern
+
+object StringUtilities {
+ // bnewbold: I propose that we:
+ // 1. keep only \p{Ideographic}, \p{Alphabetic}, and \p{Digit}
+ // 2. strip accents
+ // 3. "lower-case" (unicode-aware)
+ // 4. do any final custom/manual mappings
+ //
+ // We should check (test) that null bytes are handled, in addition to other
+ // more obvious characters
+
+ // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934
+ def removeAccents(s : String) : String = {
+ val replacements = Map(
+ '\u0141' -> 'L',
+ '\u0142' -> 'l', // Letter ell
+ '\u00d8' -> 'O',
+ '\u00f8' -> 'o'
+ )
+ val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD))
+ for (i <- 0 to sb.length - 1) {
+ for (key <- replacements.keys) {
+ if (sb(i) == key) {
+ sb.deleteCharAt(i);
+ sb.insert(i, replacements(key))
+ }
+ }
+ }
+ val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+")
+ pattern.matcher(sb).replaceAll("")
+ }
+
+ // Source: https://stackoverflow.com/a/30076541/631051
+ def removePunctuation(s: String) : String = {
+ s.replaceAll("""[\p{Punct}]""", "")
+ }
+
+ // Adapted from: https://stackoverflow.com/a/16018452/631051
+ def similarity(s1a : String, s2a : String) : Double = {
+ val (s1, s2) = (removeAccents(removePunctuation(s1a)),
+ removeAccents(removePunctuation(s2a)))
+ val longer : String = if (s1.length > s2.length) s1 else s2
+ val shorter : String = if (s1.length > s2.length) s2 else s1
+ if (longer.length == 0) {
+ // Both strings are empty.
+ 1
+ } else {
+ (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble
+ }
+ }
+
+ // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
+ def stringDistance(s1: String, s2: String): Int = {
+ val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]()
+ def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
+ def sd(s1: List[Char], s2: List[Char]): Int = {
+ if (!memo.contains((s1, s2))) {
+ memo((s1,s2)) = (s1, s2) match {
+ case (_, Nil) => s1.length
+ case (Nil, _) => s2.length
+ case (c1::t1, c2::t2) =>
+ min( sd(t1,s2) + 1, sd(s1,t2) + 1,
+ sd(t1,t2) + (if (c1==c2) 0 else 1) )
+ }
+ }
+ memo((s1,s2))
+ }
+
+ sd( s1.toList, s2.toList )
+ }
+}
+