aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala73
1 files changed, 49 insertions, 24 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 7e10c43..714af36 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -3,6 +3,7 @@ package sandcrawler
import java.util.Arrays
import java.util.Properties
+import scala.math
import scala.util.parsing.json.JSON
import cascading.tuple.Fields
@@ -17,11 +18,9 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
-class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
- HBasePipeConversions {
+class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
- /*
// key is SHA1
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
@@ -29,13 +28,14 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
.fromBytesWritable(new Fields("key", "tei_json"))
- .debug // Should be 4 tuples for mocked data
+ // .debug // Should be 4 tuples for mocked data
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
+ // TODO: Consider passing forward only a subset of JSON.
HBaseCrossrefScore.grobidToSlug(json) match {
- case Some(slug) => (slug, key, json)
- case None => (NoTitle, key, json)
+ case Some(slug) => (slug, key, json)
+ case None => (NoTitle, key, json)
}
}
.filter { entry =>
@@ -46,15 +46,12 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }
- */
val crossrefSource = TextLine(args("crossref-input"))
- val crossrefPipe : TypedPipe[String] = crossrefSource
+ val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
.read
- .debug // Should be 4 tuples for mocked data
+ // .debug // Should be 4 tuples for mocked data
.toTypedPipe[String]('line)
- /*
- .map{line : String => (line, "foo")}
.map{ json : String =>
HBaseCrossrefScore.crossrefToSlug(json) match {
case Some(slug) => (slug, json)
@@ -65,26 +62,21 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, json) = entry
slug != NoTitle
}
- */
- .write(TypedTsv[String](args("output")))
-
- /*
val crossrefGroup = crossrefPipe
.groupBy { case (slug, json) => slug }
- // TODO: Figure out which is smaller.
- val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
+ val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
grobidGroup.join(crossrefGroup)
theJoin.map{ entry =>
- val (slug : String,
- ((slug0: String, sha1 : String, grobidJson : String),
- (slug1 : String, crossrefJson : String))) = entry
- // TODO: For now, output it all.
- (slug, slug0, slug1, sha1, grobidJson, crossrefJson)}
- .write(TypedTsv[(String, String, String, String, String, String)](args("output")))
- */
+ val (slug : String,
+ ((slug0: String, sha1 : String, grobidJson : String),
+ (slug1 : String, crossrefJson : String))) = entry
+ HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)}
+ .debug
+ // Output: score, sha1, doi, grobid title, crossref title
+ .write(TypedTsv[(Int, String, String, String, String)](args("output")))
}
@@ -137,4 +129,37 @@ object HBaseCrossrefScore {
Some(slug)
}
}
+
+ val FullTitleMatch = 100
+ val TitleLeftMatchBase = 50
+ val MaxTitleLeftMatch = 80
+ val SlugMatch = 25
+
+ def computeSimilarity(gTitle : String, cTitle : String) : Int = {
+ assert(titleToSlug(gTitle) == titleToSlug(cTitle))
+ if (gTitle == cTitle) {
+ FullTitleMatch
+ } else if (gTitle.startsWith(cTitle) || cTitle.startsWith(gTitle)) {
+ math.min(TitleLeftMatchBase + math.abs(gTitle.length - cTitle.length),
+ MaxTitleLeftMatch)
+ } else {
+ SlugMatch
+ }
+ }
+
+ def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) :
+ // (score, sha1, doi, grobidTitle, crossrefTitle)
+ (Int, String, String, String, String) = {
+ // JSON has already been validated in previous stages.
+ val grobid = jsonToMap(grobidJson)
+ val crossref = jsonToMap(crossrefJson)
+
+ val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase()
+ val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase()
+ (computeSimilarity(grobidTitle, crossrefTitle),
+ sha1,
+ crossref("DOI").asInstanceOf[String],
+ "'" + grobidTitle + "'",
+ "'" + crossrefTitle + "'")
+ }
}