aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala73
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala31
2 files changed, 71 insertions, 33 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 7e10c43..714af36 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -3,6 +3,7 @@ package sandcrawler
import java.util.Arrays
import java.util.Properties
+import scala.math
import scala.util.parsing.json.JSON
import cascading.tuple.Fields
@@ -17,11 +18,9 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
-class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
- HBasePipeConversions {
+class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
- /*
// key is SHA1
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
@@ -29,13 +28,14 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
.fromBytesWritable(new Fields("key", "tei_json"))
- .debug // Should be 4 tuples for mocked data
+ // .debug // Should be 4 tuples for mocked data
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
+ // TODO: Consider passing forward only a subset of JSON.
HBaseCrossrefScore.grobidToSlug(json) match {
- case Some(slug) => (slug, key, json)
- case None => (NoTitle, key, json)
+ case Some(slug) => (slug, key, json)
+ case None => (NoTitle, key, json)
}
}
.filter { entry =>
@@ -46,15 +46,12 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }
- */
val crossrefSource = TextLine(args("crossref-input"))
- val crossrefPipe : TypedPipe[String] = crossrefSource
+ val crossrefPipe : TypedPipe[(String, String)] = crossrefSource
.read
- .debug // Should be 4 tuples for mocked data
+ // .debug // Should be 4 tuples for mocked data
.toTypedPipe[String]('line)
- /*
- .map{line : String => (line, "foo")}
.map{ json : String =>
HBaseCrossrefScore.crossrefToSlug(json) match {
case Some(slug) => (slug, json)
@@ -65,26 +62,21 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with
val (slug, json) = entry
slug != NoTitle
}
- */
- .write(TypedTsv[String](args("output")))
-
- /*
val crossrefGroup = crossrefPipe
.groupBy { case (slug, json) => slug }
- // TODO: Figure out which is smaller.
- val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
+ val theJoin : CoGrouped[String, ((String, String, String), (String, String))] =
grobidGroup.join(crossrefGroup)
theJoin.map{ entry =>
- val (slug : String,
- ((slug0: String, sha1 : String, grobidJson : String),
- (slug1 : String, crossrefJson : String))) = entry
- // TODO: For now, output it all.
- (slug, slug0, slug1, sha1, grobidJson, crossrefJson)}
- .write(TypedTsv[(String, String, String, String, String, String)](args("output")))
- */
+ val (slug : String,
+ ((slug0: String, sha1 : String, grobidJson : String),
+ (slug1 : String, crossrefJson : String))) = entry
+ HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)}
+ .debug
+ // Output: score, sha1, doi, grobid title, crossref title
+ .write(TypedTsv[(Int, String, String, String, String)](args("output")))
}
@@ -137,4 +129,37 @@ object HBaseCrossrefScore {
Some(slug)
}
}
+
+ val FullTitleMatch = 100
+ val TitleLeftMatchBase = 50
+ val MaxTitleLeftMatch = 80
+ val SlugMatch = 25
+
+ def computeSimilarity(gTitle : String, cTitle : String) : Int = {
+ assert(titleToSlug(gTitle) == titleToSlug(cTitle))
+ if (gTitle == cTitle) {
+ FullTitleMatch
+ } else if (gTitle.startsWith(cTitle) || cTitle.startsWith(gTitle)) {
+ math.min(TitleLeftMatchBase + math.abs(gTitle.length - cTitle.length),
+ MaxTitleLeftMatch)
+ } else {
+ SlugMatch
+ }
+ }
+
+ def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) :
+ // (score, sha1, doi, grobidTitle, crossrefTitle)
+ (Int, String, String, String, String) = {
+ // JSON has already been validated in previous stages.
+ val grobid = jsonToMap(grobidJson)
+ val crossref = jsonToMap(crossrefJson)
+
+ val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase()
+ val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase()
+ (computeSimilarity(grobidTitle, crossrefTitle),
+ sha1,
+ crossref("DOI").asInstanceOf[String],
+ "'" + grobidTitle + "'",
+ "'" + crossrefTitle + "'")
+ }
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
index bd9dcd3..e6211a2 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala
@@ -163,10 +163,14 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
val (testTable, testHost) = ("test-table", "dummy-host:2181")
val grobidSampleData = List(
- List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title1"))),
- List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title2: TNG"))),
- List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title3: The Sequel"))),
- List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), Bytes.toBytes(MalformedGrobidString)))
+ List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"),
+ Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))),
+ List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"),
+ Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))),
+ List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"),
+ Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))),
+ List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"),
+ Bytes.toBytes(MalformedGrobidString)))
JobTest("sandcrawler.HBaseCrossrefScoreJob")
.arg("test", "")
@@ -180,18 +184,27 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {
grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
.source(TextLine(input), List(
0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"),
- 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"),
- 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"),
+ 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"),
+ 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"),
3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))
- .sink[String](TypedTsv[String](output)) {
+ .sink[(Int, String, String, String, String)](TypedTsv[(Int,
+ String, String, String, String)](output)) {
+ // Grobid titles:
+ // "Title 1", "Title 2: TNG", "Title 3: The Sequel"
+ // crossref slugs:
+ // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted"
+ // Join should have 3 "Title 1" slugs and 1 "Title 2" slug
outputBuffer =>
it should "return a 4-element list" in {
- outputBuffer should have length 3
+ outputBuffer should have length 4
}
+
/*
it should "return the right first entry" in {
val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0)
- slug shouldBe "title1"
+ slug shouldBe "title 1"
+ slug shouldBe slug0
+ slug shouldBe slug1
sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8")
grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8")
}