aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-09 21:01:08 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-09 21:01:08 -0700
commit2528dd4afdf2e1a3419dbf354011f1ecc25c77a5 (patch)
treeef896169b3056b29422b74aa859cf1ba2809f298 /scalding/src/main/scala
parent28c0518379d226ac25597c2840c5c81bd8551487 (diff)
downloadsandcrawler-2528dd4afdf2e1a3419dbf354011f1ecc25c77a5.tar.gz
sandcrawler-2528dd4afdf2e1a3419dbf354011f1ecc25c77a5.zip
WIP
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala3
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala5
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala218
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala5
4 files changed, 226 insertions, 5 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index 249c9ab..9842122 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -8,8 +8,9 @@ import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
+import TDsl._
-class CrossrefScorable extends Scorable {
+class CrossrefScorable extends Scorable with HBasePipeConversions {
// TODO: Generalize args so there can be multiple Grobid pipes in one job.
def getSource(args : Args) : Source = {
TextLine(args("crossref-input"))
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 5c6b140..51e40f9 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -8,6 +8,7 @@ import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
+import TDsl._
class GrobidScorable extends Scorable with HBasePipeConversions {
def getSource(args : Args) : Source = {
@@ -15,10 +16,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
}
- def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = {
pipe
.fromBytesWritable(new Fields("key", "tei_json"))
- .toTypedPipe[(String, String)](new Fields("key", "tei_json"))
+ .toTypedPipe[(String, String)](new Fields('key, 'tei_json))
.map { entry =>
val (key : String, json : String) = (entry._1, entry._2)
GrobidScorable.grobidToSlug(json) match {
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
new file mode 100644
index 0000000..725474d
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -0,0 +1,218 @@
+package sandcrawler
+
+import java.text.Normalizer
+import java.util.Arrays
+import java.util.Properties
+import java.util.regex.Pattern
+
+import scala.math
+import scala.util.parsing.json.JSON
+
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.CoGrouped
+import com.twitter.scalding.typed.Grouped
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+ val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
+
+ // key is SHA1
+ val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+
+ val temp : cascading.pipe.Pipe = grobidSource
+ .read
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ val grobidPipe : TypedPipe[(String, String, String)] = temp
+ // .debug // Should be 4 tuples for mocked data
+ .toTypedPipe[(String, String)]('key, 'tei_json)
+ .map { entry =>
+ val (key, json) = (entry._1, entry._2)
+ // TODO: Consider passing forward only a subset of JSON.
+ HBaseCrossrefScore.grobidToSlug(json) match {
+ case Some(slug) => (slug, key, json)
+ case None => (NoTitle, key, json)
+ }
+ }
+ .filter { entry =>
+ val (slug, _, _) = entry
+ slug != NoTitle
+ }
+// .debug // SHould be 3 tuples for mocked data
+
+ val grobidGroup = grobidPipe
+ .groupBy { case (slug, key, json) => slug }
+
+ val crossrefSource = TextLine(args("crossref-input"))
+ val temp2 : cascading.pipe.Pipe = crossrefSource.read
+ val crossrefPipe : TypedPipe[(String, String)] = temp2
+ // .debug // Should be 4 tuples for mocked data
+ .toTypedPipe[String]('line)
+ .map{ json : String =>
+ HBaseCrossrefScore.crossrefToSlug(json) match {
+ case Some(slug) => (slug, json)
+ case None => (NoTitle, json)
+ }
+ }
+ .filter { entry =>
+ val (slug, json) = entry
+ slug != NoTitle
+ }
+
+ val crossrefGroup = crossrefPipe
+ .groupBy { case (slug, json) => slug }
+
+ val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
+ grobidGroup.join(crossrefGroup)
+
+ theJoin.map{ entry =>
+ val (slug : String,
+ ((slug0: String, sha1 : String, grobidJson : String),
+ (slug1 : String, crossrefJson : String))) = entry
+ HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)}
+ // Output: score, sha1, doi, grobid title, crossref title
+ .write(TypedTsv[(Int, String, String, String, String)](args("output")))
+
+}
+
+object HBaseCrossrefScore {
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build(
+ hbaseTable, // HBase Table Name
+ zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?)
+ List("grobid0:tei_json"),
+ SourceMode.SCAN_ALL)
+
+ def jsonToMap(json : String) : Option[Map[String, Any]] = {
+ // https://stackoverflow.com/a/32717262/631051
+ val jsonObject = JSON.parseFull(json)
+ if (jsonObject == None) {
+ None
+ } else {
+ Some(jsonObject.get.asInstanceOf[Map[String, Any]])
+ }
+ }
+
+ def grobidToSlug(json : String) : Option[String] = {
+ jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ titleToSlug(map("title").asInstanceOf[String])
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ def crossrefToSlug(json : String) : Option[String] = {
+ jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ // TODO: Don't ignore titles after the first.
+ titleToSlug(map("title").asInstanceOf[List[String]](0))
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ def titleToSlug(title : String) : Option[String] = {
+ val slug = removeAccents(title).split(":")(0).toLowerCase()
+ if (slug.isEmpty) {
+ None
+ } else {
+ Some(slug)
+ }
+ }
+
+ val MaxScore = 1000
+
+ def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) :
+ // (score, sha1, doi, grobidTitle, crossrefTitle)
+ (Int, String, String, String, String) = {
+ jsonToMap(grobidJson) match {
+ case None => (0, "", "", "", "") // This can't happen, because grobidJson already validated in earlier stage
+ case Some(grobid) => {
+ val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase()
+
+ jsonToMap(crossrefJson) match {
+ case None => (0, "", "", "", "") // This can't happen, because crossrefJson already validated in earlier stage
+ case Some(crossref) => {
+ val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase()
+
+ (similarity(removeAccents(grobidTitle), removeAccents(crossrefTitle)),
+ sha1,
+ crossref("DOI").asInstanceOf[String],
+ "'" + grobidTitle + "'",
+ "'" + crossrefTitle + "'")
+ }
+ }
+ }
+ }
+ }
+
+ // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934
+ def removeAccents(s : String) : String = {
+ val replacements = Map(
+ '\u0141' -> 'L',
+ '\u0142' -> 'l', // Letter ell
+ '\u00d8' -> 'O',
+ '\u00f8' -> 'o'
+ )
+ val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD))
+ for (i <- 0 to sb.length - 1) {
+ for (key <- replacements.keys) {
+ if (sb(i) == key) {
+ sb.deleteCharAt(i);
+ sb.insert(i, replacements(key))
+ }
+ }
+ }
+ val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+")
+ pattern.matcher(sb).replaceAll("")
+ }
+
+ // Adapted from: https://stackoverflow.com/a/16018452/631051
+ def similarity(s1 : String, s2 : String) : Int = {
+ val longer : String = if (s1.length > s2.length) s1 else s2
+ val shorter : String = if (s1.length > s2.length) s2 else s1
+ if (longer.length == 0) {
+ // Both strings are empty.
+ MaxScore
+ } else {
+ (longer.length - stringDistance(longer, shorter)) * MaxScore / longer.length
+ }
+ }
+
+ // Source: // https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
+ def stringDistance(s1: String, s2: String): Int = {
+ val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]()
+ def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
+ def sd(s1: List[Char], s2: List[Char]): Int = {
+ if (!memo.contains((s1, s2))) {
+ memo((s1,s2)) = (s1, s2) match {
+ case (_, Nil) => s1.length
+ case (Nil, _) => s2.length
+ case (c1::t1, c2::t2) =>
+ min( sd(t1,s2) + 1, sd(s1,t2) + 1,
+ sd(t1,t2) + (if (c1==c2) 0 else 1) )
+ }
+ }
+ memo((s1,s2))
+ }
+
+ sd( s1.toList, s2.toList )
+ }
+}
+
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 92b61bc..bd03d57 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -6,13 +6,14 @@ import scala.util.parsing.json.JSON
import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
+import TDsl._
case class MapFeatures(slug : String, json : String)
case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(pipe : Pipe) : TypedPipe[(String, ReduceFeatures)] =
+ def getInputPipe(pipe : cascading.pipe.Pipe) : TypedPipe[(String, ReduceFeatures)] =
{
getFeaturesPipe(pipe)
.filter { entry => Scorable.isValidSlug(entry.slug) }
@@ -25,7 +26,7 @@ abstract class Scorable {
// abstract methods
def getSource(args : Args) : Source
- def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures]
}
object Scorable {