diff options
| author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-06 14:16:19 -0700 | 
|---|---|---|
| committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-06 14:16:19 -0700 | 
| commit | b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d (patch) | |
| tree | 2d0a4367abd95331b941d7dd919c16c5cecec6e9 /scalding/src/main/scala | |
| parent | 81dbd0e05653682dccb8bc74b39067b4ee7ac1f2 (diff) | |
| download | sandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.tar.gz sandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.zip | |
Partly refactored HBaseCrossrefScoreJob. Everything compiles.
Diffstat (limited to 'scalding/src/main/scala')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/Scorable.scala | 115 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreJob.scala | 20 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/StringUtilities.scala | 59 | 
3 files changed, 194 insertions, 0 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala new file mode 100644 index 0000000..8e0c560 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -0,0 +1,115 @@ +import scala.math +import scala.util.parsing.json.JSON + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +case class MapFeatures(val key : String, slug : String, json : String) +case class ReduceFeatures(json : String) +case class ReduceOutput(val score : Int, json1 : String, json2 : String) + +abstract class Scorable { +  def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = +  { +    getFeaturesPipe(args) +      .filter { entry => Scorable.isValidSlug(entry.slug) } +      .groupBy { case MapFeatures(key, slug, json) => slug } +      .map { tuple => +        val (slug : String, features : MapFeatures) = tuple +        (slug, ReduceFeatures(features.json)) +      } +  } + +  // abstract method +  def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] +} + +object Scorable { +  val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable + +  def isValidSlug(slug : String) = { +    slug != NoSlug +  } + +  def jsonToMap(json : String) : Option[Map[String, Any]] = { +    // https://stackoverflow.com/a/32717262/631051 +    val jsonObject = JSON.parseFull(json) +    if (jsonObject == None) { +      None +    } else { +      Some(jsonObject.get.asInstanceOf[Map[String, Any]]) +    } +  } + +  /* +  def grobidToSlug(json : String) : Option[String] = { +    jsonToMap(json) match { +      case None => None +      case Some(map) => { +        if (map contains "title") { +          titleToSlug(getString(map, "title")) +        } else { +          None +        } +      } +    } +  } + +  def crossrefToSlug(json : String) : Option[String] = { +    jsonToMap(json) match { +      case None => None +      case Some(map) => { +        if (map contains "title") { +          // TODO: Stop ignoring secondary titles +          titleToSlug(map("title").asInstanceOf[List[String]](0)) +        } else { +          None +        } +      } +    } +  } +   */ + +  def titleToSlug(title : String) : String = { +    val slug = StringUtilities.removeAccents(title).split(":")(0).toLowerCase() +    if (slug.isEmpty) { +      NoSlug +    } else { +      slug +    } +  } + +  def getStringOption(optionalMap : Option[Map[String, Any]], key : String)  +      : Option[String] = { +    optionalMap match { +      case None => None +      case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None +    } +  } + +  // Caller is responsible for ensuring that key is in map. +  def getString(map : Map[String, String], key : String) : String = { +    assert(map contains key) +    map(key).asInstanceOf[String] +  } + +  val MaxScore = 1000 + +  def computeOutput(feature1 : ReduceFeatures, feature2 : ReduceFeatures) : +      ReduceOutput = { +    val json1 = jsonToMap(feature1.json) +    val json2 = jsonToMap(feature2.json) +    getStringOption(json1, "title") match { +      case None => ReduceOutput(0, "No title", feature1.json) +      case Some(title1) => { +        getStringOption(json2, "title") match { +          case None => ReduceOutput(0, "No title", feature2.json) +          case Some(title2) =>  +            ReduceOutput( +              (StringUtilities.similarity(title1, title2) * MaxScore).toInt, +              feature1.json, feature2.json) +        } +      } +    } +  } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala new file mode 100644 index 0000000..8d4d957 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -0,0 +1,20 @@ +import java.text.Normalizer + +import scala.math +import scala.util.parsing.json.JSON + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBasePipeConversions + +class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions { +  val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) +  val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) + +  pipe1.join(pipe2).map { entry => +    val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry +    Scorable.computeOutput(features1, features2) +  } +    .write(TypedTsv[ReduceOutput](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala new file mode 100644 index 0000000..290b03f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -0,0 +1,59 @@ +import java.text.Normalizer +import java.util.regex.Pattern + +object StringUtilities { +  // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 +  def removeAccents(s : String) : String = { +    val replacements = Map( +      '\u0141' -> 'L', +      '\u0142' -> 'l',  // Letter ell +      '\u00d8' -> 'O', +      '\u00f8' -> 'o' +    ) +    val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) +    for (i <- 0 to sb.length - 1) { +      for (key <- replacements.keys) { +        if (sb(i) == key) { +          sb.deleteCharAt(i); +          sb.insert(i, replacements(key)) +        } +      } +    } +    val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") +    pattern.matcher(sb).replaceAll("") +  } + +  // Adapted from: https://stackoverflow.com/a/16018452/631051 +  def similarity(s1a : String, s2a : String) : Double = { +    val (s1, s2) = (removeAccents(s1a), removeAccents(s2a)) +    val longer : String = if (s1.length > s2.length) s1 else s2 +    val shorter : String = if (s1.length > s2.length) s2 else s1 +    if (longer.length == 0) { +      // Both strings are empty. +      1 +    } else { +      (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble +    } +  } + +  // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ +  def stringDistance(s1: String, s2: String): Int = { +    val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() +    def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) +    def sd(s1: List[Char], s2: List[Char]): Int = { +      if (!memo.contains((s1, s2))) { +        memo((s1,s2)) = (s1, s2) match { +          case (_, Nil) => s1.length +          case (Nil, _) => s2.length +          case (c1::t1, c2::t2)  => +            min( sd(t1,s2) + 1, sd(s1,t2) + 1, +              sd(t1,t2) + (if (c1==c2) 0 else 1) ) +        } +      } +      memo((s1,s2)) +    } + +    sd( s1.toList, s2.toList ) +  } +} + | 
