aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-06 14:16:19 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-06 14:16:19 -0700
commitb1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d (patch)
tree2d0a4367abd95331b941d7dd919c16c5cecec6e9 /scalding
parent81dbd0e05653682dccb8bc74b39067b4ee7ac1f2 (diff)
downloadsandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.tar.gz
sandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.zip
Partly refactored HBaseCrossrefScoreJob. Everything compiles.
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala115
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala20
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala59
3 files changed, 194 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
new file mode 100644
index 0000000..8e0c560
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -0,0 +1,115 @@
+import scala.math
+import scala.util.parsing.json.JSON
+
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+
+case class MapFeatures(val key : String, slug : String, json : String)
+case class ReduceFeatures(json : String)
+case class ReduceOutput(val score : Int, json1 : String, json2 : String)
+
+abstract class Scorable {
+ def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] =
+ {
+ getFeaturesPipe(args)
+ .filter { entry => Scorable.isValidSlug(entry.slug) }
+ .groupBy { case MapFeatures(key, slug, json) => slug }
+ .map { tuple =>
+ val (slug : String, features : MapFeatures) = tuple
+ (slug, ReduceFeatures(features.json))
+ }
+ }
+
+ // abstract method
+ def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures]
+}
+
+object Scorable {
+ val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable
+
+ def isValidSlug(slug : String) = {
+ slug != NoSlug
+ }
+
+ def jsonToMap(json : String) : Option[Map[String, Any]] = {
+ // https://stackoverflow.com/a/32717262/631051
+ val jsonObject = JSON.parseFull(json)
+ if (jsonObject == None) {
+ None
+ } else {
+ Some(jsonObject.get.asInstanceOf[Map[String, Any]])
+ }
+ }
+
+ /*
+ def grobidToSlug(json : String) : Option[String] = {
+ jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ titleToSlug(getString(map, "title"))
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ def crossrefToSlug(json : String) : Option[String] = {
+ jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ // TODO: Stop ignoring secondary titles
+ titleToSlug(map("title").asInstanceOf[List[String]](0))
+ } else {
+ None
+ }
+ }
+ }
+ }
+ */
+
+ def titleToSlug(title : String) : String = {
+ val slug = StringUtilities.removeAccents(title).split(":")(0).toLowerCase()
+ if (slug.isEmpty) {
+ NoSlug
+ } else {
+ slug
+ }
+ }
+
+ def getStringOption(optionalMap : Option[Map[String, Any]], key : String)
+ : Option[String] = {
+ optionalMap match {
+ case None => None
+ case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None
+ }
+ }
+
+ // Caller is responsible for ensuring that key is in map.
+ def getString(map : Map[String, String], key : String) : String = {
+ assert(map contains key)
+ map(key).asInstanceOf[String]
+ }
+
+ val MaxScore = 1000
+
+ def computeOutput(feature1 : ReduceFeatures, feature2 : ReduceFeatures) :
+ ReduceOutput = {
+ val json1 = jsonToMap(feature1.json)
+ val json2 = jsonToMap(feature2.json)
+ getStringOption(json1, "title") match {
+ case None => ReduceOutput(0, "No title", feature1.json)
+ case Some(title1) => {
+ getStringOption(json2, "title") match {
+ case None => ReduceOutput(0, "No title", feature2.json)
+ case Some(title2) =>
+ ReduceOutput(
+ (StringUtilities.similarity(title1, title2) * MaxScore).toInt,
+ feature1.json, feature2.json)
+ }
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
new file mode 100644
index 0000000..8d4d957
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -0,0 +1,20 @@
+import java.text.Normalizer
+
+import scala.math
+import scala.util.parsing.json.JSON
+
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions {
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
+
+ pipe1.join(pipe2).map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ Scorable.computeOutput(features1, features2)
+ }
+ .write(TypedTsv[ReduceOutput](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
new file mode 100644
index 0000000..290b03f
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -0,0 +1,59 @@
+import java.text.Normalizer
+import java.util.regex.Pattern
+
+object StringUtilities {
+ // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934
+ def removeAccents(s : String) : String = {
+ val replacements = Map(
+ '\u0141' -> 'L',
+ '\u0142' -> 'l', // Letter ell
+ '\u00d8' -> 'O',
+ '\u00f8' -> 'o'
+ )
+ val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD))
+ for (i <- 0 to sb.length - 1) {
+ for (key <- replacements.keys) {
+ if (sb(i) == key) {
+ sb.deleteCharAt(i);
+ sb.insert(i, replacements(key))
+ }
+ }
+ }
+ val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+")
+ pattern.matcher(sb).replaceAll("")
+ }
+
+ // Adapted from: https://stackoverflow.com/a/16018452/631051
+ def similarity(s1a : String, s2a : String) : Double = {
+ val (s1, s2) = (removeAccents(s1a), removeAccents(s2a))
+ val longer : String = if (s1.length > s2.length) s1 else s2
+ val shorter : String = if (s1.length > s2.length) s2 else s1
+ if (longer.length == 0) {
+ // Both strings are empty.
+ 1
+ } else {
+ (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble
+ }
+ }
+
+ // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
+ def stringDistance(s1: String, s2: String): Int = {
+ val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]()
+ def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
+ def sd(s1: List[Char], s2: List[Char]): Int = {
+ if (!memo.contains((s1, s2))) {
+ memo((s1,s2)) = (s1, s2) match {
+ case (_, Nil) => s1.length
+ case (Nil, _) => s2.length
+ case (c1::t1, c2::t2) =>
+ min( sd(t1,s2) + 1, sd(s1,t2) + 1,
+ sd(t1,t2) + (if (c1==c2) 0 else 1) )
+ }
+ }
+ memo((s1,s2))
+ }
+
+ sd( s1.toList, s2.toList )
+ }
+}
+