diff options
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 73 |
1 files changed, 49 insertions, 24 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 7e10c43..714af36 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -3,6 +3,7 @@ package sandcrawler import java.util.Arrays import java.util.Properties +import scala.math import scala.util.parsing.json.JSON import cascading.tuple.Fields @@ -17,11 +18,9 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with - HBasePipeConversions { +class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable - /* // key is SHA1 val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), @@ -29,13 +28,14 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val grobidPipe : TypedPipe[(String, String, String)] = grobidSource .read .fromBytesWritable(new Fields("key", "tei_json")) - .debug // Should be 4 tuples for mocked data + // .debug // Should be 4 tuples for mocked data .toTypedPipe[(String, String)]('key, 'tei_json) .map { entry => val (key, json) = (entry._1, entry._2) + // TODO: Consider passing forward only a subset of JSON. HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => (slug, key, json) - case None => (NoTitle, key, json) + case Some(slug) => (slug, key, json) + case None => (NoTitle, key, json) } } .filter { entry => @@ -46,15 +46,12 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val grobidGroup = grobidPipe .groupBy { case (slug, key, json) => slug } - */ val crossrefSource = TextLine(args("crossref-input")) - val crossrefPipe : TypedPipe[String] = crossrefSource + val crossrefPipe : TypedPipe[(String, String)] = crossrefSource .read - .debug // Should be 4 tuples for mocked data + // .debug // Should be 4 tuples for mocked data .toTypedPipe[String]('line) - /* - .map{line : String => (line, "foo")} .map{ json : String => HBaseCrossrefScore.crossrefToSlug(json) match { case Some(slug) => (slug, json) @@ -65,26 +62,21 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val (slug, json) = entry slug != NoTitle } - */ - .write(TypedTsv[String](args("output"))) - - /* val crossrefGroup = crossrefPipe .groupBy { case (slug, json) => slug } - // TODO: Figure out which is smaller. - val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = + val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = grobidGroup.join(crossrefGroup) theJoin.map{ entry => - val (slug : String, - ((slug0: String, sha1 : String, grobidJson : String), - (slug1 : String, crossrefJson : String))) = entry - // TODO: For now, output it all. - (slug, slug0, slug1, sha1, grobidJson, crossrefJson)} - .write(TypedTsv[(String, String, String, String, String, String)](args("output"))) - */ + val (slug : String, + ((slug0: String, sha1 : String, grobidJson : String), + (slug1 : String, crossrefJson : String))) = entry + HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} + .debug + // Output: score, sha1, doi, grobid title, crossref title + .write(TypedTsv[(Int, String, String, String, String)](args("output"))) } @@ -137,4 +129,37 @@ object HBaseCrossrefScore { Some(slug) } } + + val FullTitleMatch = 100 + val TitleLeftMatchBase = 50 + val MaxTitleLeftMatch = 80 + val SlugMatch = 25 + + def computeSimilarity(gTitle : String, cTitle : String) : Int = { + assert(titleToSlug(gTitle) == titleToSlug(cTitle)) + if (gTitle == cTitle) { + FullTitleMatch + } else if (gTitle.startsWith(cTitle) || cTitle.startsWith(gTitle)) { + math.min(TitleLeftMatchBase + math.abs(gTitle.length - cTitle.length), + MaxTitleLeftMatch) + } else { + SlugMatch + } + } + + def computeOutput(sha1 : String, grobidJson : String, crossrefJson : String) : + // (score, sha1, doi, grobidTitle, crossrefTitle) + (Int, String, String, String, String) = { + // JSON has already been validated in previous stages. + val grobid = jsonToMap(grobidJson) + val crossref = jsonToMap(crossrefJson) + + val grobidTitle = grobid("title").asInstanceOf[String].toLowerCase() + val crossrefTitle = crossref("title").asInstanceOf[List[String]](0).toLowerCase() + (computeSimilarity(grobidTitle, crossrefTitle), + sha1, + crossref("DOI").asInstanceOf[String], + "'" + grobidTitle + "'", + "'" + crossrefTitle + "'") + } } |