diff options
| author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-09 21:01:08 -0700 | 
|---|---|---|
| committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-09 21:01:08 -0700 | 
| commit | 2528dd4afdf2e1a3419dbf354011f1ecc25c77a5 (patch) | |
| tree | ef896169b3056b29422b74aa859cf1ba2809f298 /scalding/src/main/scala | |
| parent | 28c0518379d226ac25597c2840c5c81bd8551487 (diff) | |
| download | sandcrawler-2528dd4afdf2e1a3419dbf354011f1ecc25c77a5.tar.gz sandcrawler-2528dd4afdf2e1a3419dbf354011f1ecc25c77a5.zip | |
WIP
Diffstat (limited to 'scalding/src/main/scala')
4 files changed, 226 insertions, 5 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 249c9ab..9842122 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -8,8 +8,9 @@ import com.twitter.scalding.typed.TDsl._  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions  import parallelai.spyglass.hbase.HBaseSource +import TDsl._ -class CrossrefScorable extends Scorable { +class CrossrefScorable extends Scorable with HBasePipeConversions {    // TODO: Generalize args so there can be multiple Grobid pipes in one job.    def getSource(args : Args) : Source = {      TextLine(args("crossref-input")) diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 5c6b140..51e40f9 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -8,6 +8,7 @@ import com.twitter.scalding.typed.TDsl._  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions  import parallelai.spyglass.hbase.HBaseSource +import TDsl._  class GrobidScorable extends Scorable with HBasePipeConversions {    def getSource(args : Args) : Source = { @@ -15,10 +16,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {      GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))    } -  def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { +  def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = {      pipe        .fromBytesWritable(new Fields("key", "tei_json")) -      .toTypedPipe[(String, String)](new Fields("key", "tei_json")) +      .toTypedPipe[(String, String)](new Fields('key, 'tei_json))        .map { entry =>          val (key : String, json : String) = (entry._1, entry._2)          GrobidScorable.grobidToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala new file mode 100644 index 0000000..725474d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -0,0 +1,218 @@ +package sandcrawler + +import java.text.Normalizer +import java.util.Arrays +import java.util.Properties +import java.util.regex.Pattern + +import scala.math +import scala.util.parsing.json.JSON + +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.CoGrouped +import com.twitter.scalding.typed.Grouped +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { +  val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable + +  // key is SHA1 +  val grobidSource = HBaseCrossrefScore.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts")) + +  val temp : cascading.pipe.Pipe = grobidSource +    .read +    .fromBytesWritable(new Fields("key", "tei_json")) +  val grobidPipe : TypedPipe[(String, String, String)] = temp +    //  .debug  // Should be 4 tuples for mocked data +    .toTypedPipe[(String, String)]('key, 'tei_json) +    .map { entry => +      val (key, json) = (entry._1, entry._2) +      // TODO: Consider passing forward only a subset of JSON. +      HBaseCrossrefScore.grobidToSlug(json) match { +        case Some(slug) => (slug, key, json) +        case None => (NoTitle, key, json) +      } +    } +    .filter { entry => +      val (slug, _, _) = entry +      slug != NoTitle +    } +//    .debug  // SHould be 3 tuples for mocked data + +  val grobidGroup = grobidPipe +    .groupBy { case (slug, key, json) => slug } + +  val crossrefSource = TextLine(args("crossref-input")) +  val temp2 : cascading.pipe.Pipe = crossrefSource.read +  val crossrefPipe : TypedPipe[(String, String)] = temp2 +    //    .debug // Should be 4 tuples for mocked data +    .toTypedPipe[String]('line) +    .map{ json : String => +      HBaseCrossrefScore.crossrefToSlug(json) match { +        case Some(slug) => (slug, json) +        case None => (NoTitle, json) +      } +    } +    .filter { entry => +      val (slug, json) = entry +      slug != NoTitle +    } + +  val crossrefGroup = crossrefPipe +  .groupBy { case (slug, json) => slug } + +  val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = +    grobidGroup.join(crossrefGroup) + +  theJoin.map{ entry => +    val (slug : String, +      ((slug0: String, sha1 : String, grobidJson : String), +        (slug1 : String, crossrefJson : String))) = entry +    HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} +    // Output: score, sha1, doi, grobid title, crossref title +    .write(TypedTsv[(Int, String, String, String, String)](args("output"))) + +} + +object HBaseCrossrefScore { +  def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( +    hbaseTable,      // HBase Table Name +    zookeeperHosts,  // HBase Zookeeper server (to get runtime config info; can be array?) +    List("grobid0:tei_json"), +    SourceMode.SCAN_ALL) + +  def jsonToMap(json : String) : Option[Map[String, Any]] = { +    // https://stackoverflow.com/a/32717262/631051 +    val jsonObject = JSON.parseFull(json) +    if (jsonObject == None) { +      None +    } else { +      Some(jsonObject.get.asInstanceOf[Map[String, Any]]) +    } +  } + +  def grobidToSlug(json : String) : Option[String] = { +    jsonToMap(json) match { +      case None => None +      case Some(map) => { +        if (map contains "title") { +          titleToSlug(map("title").asInstanceOf[String]) +        } else { +          None +        } +      } +    } +  } + +  def crossrefToSlug(json : String) : Option[String] = { +    jsonToMap(json) match { +      case None => None +      case Some(map) => { +        if (map contains "title") { +          // TODO: Don't ignore titles after the first. +          titleToSlug(map("title").asInstanceOf[List[String]](0)) +        } else { +          None +        } +      } +    } +  } + +  def titleToSlug(title : String) : Option[String] = { +    val slug = removeAccents(title).split(":")(0).toLowerCase() +    if (slug.isEmpty) { +      None +    } else { +      Some(slug) +    } +  } + +  val MaxScore = 1000 + +  def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : +    // (score, sha1, doi, grobidTitle, crossrefTitle) +      (Int, String, String, String, String) = { +    jsonToMap(grobidJson) match { +      case None => (0, "", "", "", "")  // This can't happen, because grobidJson already validated in earlier stage +      case Some(grobid) => { +        val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() + +        jsonToMap(crossrefJson) match { +          case None => (0, "", "", "", "")  // This can't happen, because crossrefJson already validated in earlier stage +          case Some(crossref) => { +            val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() + +            (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)), +              sha1, +              crossref("DOI").asInstanceOf[String], +              "'" + grobidTitle + "'", +              "'" + crossrefTitle + "'") +          } +        } +      } +    } +  } + +  // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 +  def removeAccents(s : String) : String = { +    val replacements = Map( +      '\u0141' -> 'L', +      '\u0142' -> 'l',  // Letter ell +      '\u00d8' -> 'O', +      '\u00f8' -> 'o' +    ) +    val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) +    for (i <- 0 to sb.length - 1) { +      for (key <- replacements.keys) { +        if (sb(i) == key) { +          sb.deleteCharAt(i); +          sb.insert(i, replacements(key)) +        } +      } +    } +    val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") +    pattern.matcher(sb).replaceAll("") +  } + +  // Adapted from: https://stackoverflow.com/a/16018452/631051 +  def similarity(s1 : String, s2 : String) : Int = { +    val longer : String = if (s1.length > s2.length) s1 else s2 +    val shorter : String = if (s1.length > s2.length) s2 else s1 +    if (longer.length == 0) { +      // Both strings are empty. +      MaxScore +    } else { +      (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length +    } +  } + +  // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ +  def stringDistance(s1: String, s2: String): Int = { +    val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() +    def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) +    def sd(s1: List[Char], s2: List[Char]): Int = { +      if (!memo.contains((s1, s2))) { +        memo((s1,s2)) = (s1, s2) match { +          case (_, Nil) => s1.length +          case (Nil, _) => s2.length +          case (c1::t1, c2::t2)  => +            min( sd(t1,s2) + 1, sd(s1,t2) + 1, +              sd(t1,t2) + (if (c1==c2) 0 else 1) ) +        } +      } +      memo((s1,s2)) +    } + +    sd( s1.toList, s2.toList ) +  } +} + diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 92b61bc..bd03d57 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -6,13 +6,14 @@ import scala.util.parsing.json.JSON  import cascading.flow.FlowDef  import com.twitter.scalding._  import com.twitter.scalding.typed.TDsl._ +import TDsl._  case class MapFeatures(slug : String, json : String)  case class ReduceFeatures(json : String)  case class ReduceOutput(val slug : String,  score : Int, json1 : String, json2 : String)  abstract class Scorable { -  def getInputPipe(pipe : Pipe) : TypedPipe[(String, ReduceFeatures)] = +  def getInputPipe(pipe : cascading.pipe.Pipe) : TypedPipe[(String, ReduceFeatures)] =    {      getFeaturesPipe(pipe)        .filter { entry => Scorable.isValidSlug(entry.slug) } @@ -25,7 +26,7 @@ abstract class Scorable {    // abstract methods    def getSource(args : Args) : Source -  def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] +  def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures]  }  object Scorable { | 
