diff options
Diffstat (limited to 'scalding/src/main')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 58 |
1 files changed, 40 insertions, 18 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 12660e8..1360af0 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -8,75 +8,97 @@ import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { // key is SHA1 - val grobidSource = HBaseBuilder.build( - args("grobid-table"), - args("zookeeper-hosts"), - List("grobid0:tei_json"), - sourceMode = SourceMode.SCAN_ALL) - + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) val grobidPipe = grobidSource .read .map('tei_json -> 'slug) { - json : String => HBaseCrossrefScore.grobidToSlug(json)} + json : ImmutableBytesWritable => { + HBaseCrossrefScore.grobidToSlug(json.toString) match { + case Some(slug) => slug + case None => "nothing" + } + } + } + .debug + .map('key -> 'sha1) { sha1 : String => sha1 } - val crossrefSource = TextLine(args("input")) + val crossrefSource = TextLine(args("crossref-input")) val crossrefPipe = crossrefSource .read .map('line -> 'slug) { json : String => HBaseCrossrefScore.crossrefToSlug(json)} - -/* - statusPipe.groupBy { identity } - .size .debug - .write(TypedTsv[(Long,Long)](args("output"))) - */ + + val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe) + innerJoinPipe + .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) { + x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)} + .write(TypedTsv[(String, String, String)](args("output"))) } object HBaseCrossrefScore { + def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( + hbaseTable, // HBase Table Name + zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?) + List("grobid0:tei_json"), + SourceMode.SCAN_ALL) + + def performJoin(grobidJson : String, crossRefJson : String, sha1 : String) : (String, String, String) = { + (sha1, "1.2.3.4", "100") + } + def jsonToMap(json : String) : Map[String, Any] = { // https://stackoverflow.com/a/32717262/631051 val jsonObject = JSON.parseFull(json) if (jsonObject == None) { // Empty map for malformed JSON - Map[String, Any]() + Map[String, Any]("foo" -> json) } else { jsonObject.get.asInstanceOf[Map[String, Any]] } } - def grobidToSlug(json : String) : Option[String] = { + throw new Exception(json) val map = jsonToMap(json) if (map contains "title") { titleToSlug(map("title").asInstanceOf[String]) } else { - None + Some("grobidToSlug None: " + map("foo")) } } def crossrefToSlug(json : String) : Option[String] = { val map = jsonToMap(json) if (map contains "title") { + // TODO: Don't ignore titles after the first. titleToSlug(map("title").asInstanceOf[List[String]](0)) } else { - None + Some("crossRefToSlug None") } } def titleToSlug(title : String) : Option[String] = { + Some(title) + /* val slug = title.split(":")(0).toLowerCase() + println("title: " + title + ", slug: " + slug) if (slug.isEmpty) { None } else { Some(slug) } + */ } } |