diff options
Diffstat (limited to 'scalding')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 12 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala | 9 |
2 files changed, 10 insertions, 11 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index ac633e4..bcb6156 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -17,7 +17,6 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource - class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable @@ -29,6 +28,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val grobidPipe : TypedPipe[(String, String, String)] = grobidSource .read .fromBytesWritable(new Fields("key", "tei_json")) + .debug .toTypedPipe[(String, String)]('key, 'tei_json) .map { entry => val (key, json) = (entry._1, entry._2) @@ -41,24 +41,24 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val (slug, _, _) = entry slug != NoTitle } + .debug + .write(TypedTsv[(String, String, String)](args("output"))) + + /* val grobidGroup = grobidPipe .groupBy { case (slug, key, json) => slug } -// .debug - val crossrefSource = TextLine(args("crossref-input")) val crossrefPipe : TypedPipe[(String, String)] = crossrefSource .read .toTypedPipe[String]('line) .map{ json : String => -// val (offset, json) = entry HBaseCrossrefScore.crossrefToSlug(json) match { case Some(slug) => (slug, json) case None => (NoTitle, json) } } - .debug .filter { entry => val (slug, json) = entry slug != NoTitle @@ -77,7 +77,7 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with // TODO: For now, output it all. (slug, slug0, slug1, sha1, grobidJson, crossrefJson)} .write(TypedTsv[(String, String, String, String, String, String)](args("output"))) - + */ } diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala index dc96003..96c7770 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala @@ -178,18 +178,17 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers { .arg("debug", "true") .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .source(TextLine(input), List(( + .source(TextLine(input), List( CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"), CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"), - CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))) - .sink[(String, String, String, String, String, - String)](TypedTsv[(String, String, String, String, String, String)](output)) { + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"))) + .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) { outputBuffer => - /* it should "return a 3-element list" in { outputBuffer should have length 3 } + /* it should "return the right first entry" in { val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) slug shouldBe "title1" |