diff options
Diffstat (limited to 'scalding')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 28 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala | 10 |
2 files changed, 21 insertions, 17 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 1360af0..56eb91e 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -1,5 +1,6 @@ package sandcrawler +import java.util.Arrays import java.util.Properties import scala.util.parsing.json.JSON @@ -20,19 +21,22 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), args("zookeeper-hosts")) - val grobidPipe = grobidSource + val grobidPipe : TypedPipe[(String, String, String)] = grobidSource .read - .map('tei_json -> 'slug) { - json : ImmutableBytesWritable => { - HBaseCrossrefScore.grobidToSlug(json.toString) match { - case Some(slug) => slug - case None => "nothing" - } + .fromBytesWritable(new Fields("key", "tei_json")) + .debug + .toTypedPipe[(String, String)]('key, 'tei_json) + .map { entry => + val (key, json) = (entry._1, entry._2) + HBaseCrossrefScore.grobidToSlug(json) match { + case Some(slug) => (key, json, slug) + case None => (key, json, "none") } } - .debug - .map('key -> 'sha1) { sha1 : String => sha1 } + .write(TypedTsv[(String, String, String)](args("output"))) +/* + .map('key -> 'sha1) { sha1 : String => sha1 } val crossrefSource = TextLine(args("crossref-input")) val crossrefPipe = crossrefSource .read @@ -45,6 +49,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) { x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)} .write(TypedTsv[(String, String, String)](args("output"))) + */ } object HBaseCrossrefScore { @@ -70,7 +75,6 @@ object HBaseCrossrefScore { } def grobidToSlug(json : String) : Option[String] = { - throw new Exception(json) val map = jsonToMap(json) if (map contains "title") { titleToSlug(map("title").asInstanceOf[String]) @@ -90,15 +94,11 @@ object HBaseCrossrefScore { } def titleToSlug(title : String) : Option[String] = { - Some(title) - /* val slug = title.split(":")(0).toLowerCase() - println("title: " + title + ", slug: " + slug) if (slug.isEmpty) { None } else { Some(slug) } - */ } } diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala index f52c5b4..0d681b9 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala @@ -178,10 +178,14 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { .source(TextLine(input), List(( "0" -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), "1" -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))) - .sink[Tuple](TypedTsv[(String, String, String)](output)) { + .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) { outputBuffer => - it("should return a 2-element list.") { - assert(outputBuffer.size === 2) + it("should return a 4-element list.") { + assert(outputBuffer.size === 4) + } + it("should return the right slugs.") { + val (sha1, json, slug) = outputBuffer(0) + assert(slug == "title1") } } .run |