diff options
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 1360af0..56eb91e 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -1,5 +1,6 @@ package sandcrawler +import java.util.Arrays import java.util.Properties import scala.util.parsing.json.JSON @@ -20,19 +21,22 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), args("zookeeper-hosts")) - val grobidPipe = grobidSource + val grobidPipe : TypedPipe[(String, String, String)] = grobidSource .read - .map('tei_json -> 'slug) { - json : ImmutableBytesWritable => { - HBaseCrossrefScore.grobidToSlug(json.toString) match { - case Some(slug) => slug - case None => "nothing" - } + .fromBytesWritable(new Fields("key", "tei_json")) + .debug + .toTypedPipe[(String, String)]('key, 'tei_json) + .map { entry => + val (key, json) = (entry._1, entry._2) + HBaseCrossrefScore.grobidToSlug(json) match { + case Some(slug) => (key, json, slug) + case None => (key, json, "none") } } - .debug - .map('key -> 'sha1) { sha1 : String => sha1 } + .write(TypedTsv[(String, String, String)](args("output"))) +/* + .map('key -> 'sha1) { sha1 : String => sha1 } val crossrefSource = TextLine(args("crossref-input")) val crossrefPipe = crossrefSource .read @@ -45,6 +49,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) { x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)} .write(TypedTsv[(String, String, String)](args("output"))) + */ } object HBaseCrossrefScore { @@ -70,7 +75,6 @@ object HBaseCrossrefScore { } def grobidToSlug(json : String) : Option[String] = { - throw new Exception(json) val map = jsonToMap(json) if (map contains "title") { titleToSlug(map("title").asInstanceOf[String]) @@ -90,15 +94,11 @@ object HBaseCrossrefScore { } def titleToSlug(title : String) : Option[String] = { - Some(title) - /* val slug = title.split(":")(0).toLowerCase() - println("title: " + title + ", slug: " + slug) if (slug.isEmpty) { None } else { Some(slug) } - */ } } |