aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-07-25 10:46:04 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-07-25 10:46:04 -0700
commit4c5dbdf964da9ca29246b0f8eadec6daae1d3ffb (patch)
tree016edba956ee1af0af2a249225db78df843ad8d0 /scalding/src/main
parenta950d5d5c61fb77b2ba83703ef853ef951ac94af (diff)
downloadsandcrawler-4c5dbdf964da9ca29246b0f8eadec6daae1d3ffb.tar.gz
sandcrawler-4c5dbdf964da9ca29246b0f8eadec6daae1d3ffb.zip
Figured out string conversion. Tests pass. Still WIP.
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala28
1 files changed, 14 insertions, 14 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 1360af0..56eb91e 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -1,5 +1,6 @@
package sandcrawler
+import java.util.Arrays
import java.util.Properties
import scala.util.parsing.json.JSON
@@ -20,19 +21,22 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"))
- val grobidPipe = grobidSource
+ val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
.read
- .map('tei_json -> 'slug) {
- json : ImmutableBytesWritable => {
- HBaseCrossrefScore.grobidToSlug(json.toString) match {
- case Some(slug) => slug
- case None => "nothing"
- }
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ .debug
+ .toTypedPipe[(String, String)]('key, 'tei_json)
+ .map { entry =>
+ val (key, json) = (entry._1, entry._2)
+ HBaseCrossrefScore.grobidToSlug(json) match {
+ case Some(slug) => (key, json, slug)
+ case None => (key, json, "none")
}
}
- .debug
- .map('key -> 'sha1) { sha1 : String => sha1 }
+ .write(TypedTsv[(String, String, String)](args("output")))
+/*
+ .map('key -> 'sha1) { sha1 : String => sha1 }
val crossrefSource = TextLine(args("crossref-input"))
val crossrefPipe = crossrefSource
.read
@@ -45,6 +49,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
.mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) {
x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)}
.write(TypedTsv[(String, String, String)](args("output")))
+ */
}
object HBaseCrossrefScore {
@@ -70,7 +75,6 @@ object HBaseCrossrefScore {
}
def grobidToSlug(json : String) : Option[String] = {
- throw new Exception(json)
val map = jsonToMap(json)
if (map contains "title") {
titleToSlug(map("title").asInstanceOf[String])
@@ -90,15 +94,11 @@ object HBaseCrossrefScore {
}
def titleToSlug(title : String) : Option[String] = {
- Some(title)
- /*
val slug = title.split(":")(0).toLowerCase()
- println("title: " + title + ", slug: " + slug)
if (slug.isEmpty) {
None
} else {
Some(slug)
}
- */
}
}