aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala58
1 files changed, 40 insertions, 18 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 12660e8..1360af0 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -8,75 +8,97 @@ import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
// key is SHA1
- val grobidSource = HBaseBuilder.build(
- args("grobid-table"),
- args("zookeeper-hosts"),
- List("grobid0:tei_json"),
- sourceMode = SourceMode.SCAN_ALL)
-
+ val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
val grobidPipe = grobidSource
.read
.map('tei_json -> 'slug) {
- json : String => HBaseCrossrefScore.grobidToSlug(json)}
+ json : ImmutableBytesWritable => {
+ HBaseCrossrefScore.grobidToSlug(json.toString) match {
+ case Some(slug) => slug
+ case None => "nothing"
+ }
+ }
+ }
+ .debug
+ .map('key -> 'sha1) { sha1 : String => sha1 }
- val crossrefSource = TextLine(args("input"))
+ val crossrefSource = TextLine(args("crossref-input"))
val crossrefPipe = crossrefSource
.read
.map('line -> 'slug) {
json : String => HBaseCrossrefScore.crossrefToSlug(json)}
-
-/*
- statusPipe.groupBy { identity }
- .size
.debug
- .write(TypedTsv[(Long,Long)](args("output")))
- */
+
+ val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe)
+ innerJoinPipe
+ .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) {
+ x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)}
+ .write(TypedTsv[(String, String, String)](args("output")))
}
object HBaseCrossrefScore {
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build(
+ hbaseTable, // HBase Table Name
+ zookeeperHosts, // HBase Zookeeper server (to get runtime config info; can be array?)
+ List("grobid0:tei_json"),
+ SourceMode.SCAN_ALL)
+
+ def performJoin(grobidJson : String, crossRefJson : String, sha1 : String) : (String, String, String) = {
+ (sha1, "1.2.3.4", "100")
+ }
+
def jsonToMap(json : String) : Map[String, Any] = {
// https://stackoverflow.com/a/32717262/631051
val jsonObject = JSON.parseFull(json)
if (jsonObject == None) {
// Empty map for malformed JSON
- Map[String, Any]()
+ Map[String, Any]("foo" -> json)
} else {
jsonObject.get.asInstanceOf[Map[String, Any]]
}
}
-
def grobidToSlug(json : String) : Option[String] = {
+ throw new Exception(json)
val map = jsonToMap(json)
if (map contains "title") {
titleToSlug(map("title").asInstanceOf[String])
} else {
- None
+ Some("grobidToSlug None: " + map("foo"))
}
}
def crossrefToSlug(json : String) : Option[String] = {
val map = jsonToMap(json)
if (map contains "title") {
+ // TODO: Don't ignore titles after the first.
titleToSlug(map("title").asInstanceOf[List[String]](0))
} else {
- None
+ Some("crossRefToSlug None")
}
}
def titleToSlug(title : String) : Option[String] = {
+ Some(title)
+ /*
val slug = title.split(":")(0).toLowerCase()
+ println("title: " + title + ", slug: " + slug)
if (slug.isEmpty) {
None
} else {
Some(slug)
}
+ */
}
}