diff options
Diffstat (limited to 'scalding')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 73 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala | 31 |
2 files changed, 71 insertions, 33 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 7e10c43..714af36 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -3,6 +3,7 @@ package sandcrawler import java.util.Arrays import java.util.Properties +import scala.math import scala.util.parsing.json.JSON import cascading.tuple.Fields @@ -17,11 +18,9 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with - HBasePipeConversions { +class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable - /* // key is SHA1 val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), @@ -29,13 +28,14 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val grobidPipe : TypedPipe[(String, String, String)] = grobidSource .read .fromBytesWritable(new Fields("key", "tei_json")) - .debug // Should be 4 tuples for mocked data + // .debug // Should be 4 tuples for mocked data .toTypedPipe[(String, String)]('key, 'tei_json) .map { entry => val (key, json) = (entry._1, entry._2) + // TODO: Consider passing forward only a subset of JSON. HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => (slug, key, json) - case None => (NoTitle, key, json) + case Some(slug) => (slug, key, json) + case None => (NoTitle, key, json) } } .filter { entry => @@ -46,15 +46,12 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val grobidGroup = grobidPipe .groupBy { case (slug, key, json) => slug } - */ val crossrefSource = TextLine(args("crossref-input")) - val crossrefPipe : TypedPipe[String] = crossrefSource + val crossrefPipe : TypedPipe[(String, String)] = crossrefSource .read - .debug // Should be 4 tuples for mocked data + // .debug // Should be 4 tuples for mocked data .toTypedPipe[String]('line) - /* - .map{line : String => (line, "foo")} .map{ json : String => HBaseCrossrefScore.crossrefToSlug(json) match { case Some(slug) => (slug, json) @@ -65,26 +62,21 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val (slug, json) = entry slug != NoTitle } - */ - .write(TypedTsv[String](args("output"))) - - /* val crossrefGroup = crossrefPipe .groupBy { case (slug, json) => slug } - // TODO: Figure out which is smaller. - val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = + val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = grobidGroup.join(crossrefGroup) theJoin.map{ entry => - val (slug : String, - ((slug0: String, sha1 : String, grobidJson : String), - (slug1 : String, crossrefJson : String))) = entry - // TODO: For now, output it all. - (slug, slug0, slug1, sha1, grobidJson, crossrefJson)} - .write(TypedTsv[(String, String, String, String, String, String)](args("output"))) - */ + val (slug : String, + ((slug0: String, sha1 : String, grobidJson : String), + (slug1 : String, crossrefJson : String))) = entry + HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} + .debug + // Output: score, sha1, doi, grobid title, crossref title + .write(TypedTsv[(Int, String, String, String, String)](args("output"))) } @@ -137,4 +129,37 @@ object HBaseCrossrefScore { Some(slug) } } + + val FullTitleMatch = 100 + val TitleLeftMatchBase = 50 + val MaxTitleLeftMatch = 80 + val SlugMatch = 25 + + def computeSimilarity(gTitle : String, cTitle : String) : Int = { + assert(titleToSlug(gTitle) == titleToSlug(cTitle)) + if (gTitle == cTitle) { + FullTitleMatch + } else if (gTitle.startsWith(cTitle) || cTitle.startsWith(gTitle)) { + math.min(TitleLeftMatchBase + math.abs(gTitle.length - cTitle.length), + MaxTitleLeftMatch) + } else { + SlugMatch + } + } + + def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : + // (score, sha1, doi, grobidTitle, crossrefTitle) + (Int, String, String, String, String) = { + // JSON has already been validated in previous stages. + val grobid = jsonToMap(grobidJson) + val crossref = jsonToMap(crossrefJson) + + val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() + val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() + (computeSimilarity(grobidTitle, crossrefTitle), + sha1, + crossref("DOI").asInstanceOf[String], + "'" + grobidTitle + "'", + "'" + crossrefTitle + "'") + } } diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala index bd9dcd3..e6211a2 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala @@ -163,10 +163,14 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers { val (testTable, testHost) = ("test-table", "dummy-host:2181") val grobidSampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title1"))), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title2: TNG"))), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title3: The Sequel"))), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), Bytes.toBytes(MalformedGrobidString))) + List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 1"))), + List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 2: TNG"))), + List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), + Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"))), + List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), + Bytes.toBytes(MalformedGrobidString))) JobTest("sandcrawler.HBaseCrossrefScoreJob") .arg("test", "") @@ -180,18 +184,27 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers { grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) .source(TextLine(input), List( 0 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"), - 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"), + 1 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2").replace("<<DOI>>", "DOI-0.5"), + 2 -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), 3 -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) - .sink[String](TypedTsv[String](output)) { + .sink[(Int, String, String, String, String)](TypedTsv[(Int, + String, String, String, String)](output)) { + // Grobid titles: + // "Title 1", "Title 2: TNG", "Title 3: The Sequel" + // crossref slugs: + // "Title 1: TNG", "Title 1: TNG 2", "Title 1: TNG 3", "Title 2 Rebooted" + // Join should have 3 "Title 1" slugs and 1 "Title 2" slug outputBuffer => it should "return a 4-element list" in { - outputBuffer should have length 3 + outputBuffer should have length 4 } + /* it should "return the right first entry" in { val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) - slug shouldBe "title1" + slug shouldBe "title 1" + slug shouldBe slug0 + slug shouldBe slug1 sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") } |