diff options
Diffstat (limited to 'scalding')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 65 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala | 46 |
2 files changed, 68 insertions, 43 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 7b7deec..ac633e4 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -7,6 +7,8 @@ import scala.util.parsing.json.JSON import cascading.tuple.Fields import com.twitter.scalding._ +import com.twitter.scalding.typed.CoGrouped +import com.twitter.scalding.typed.Grouped import com.twitter.scalding.typed.TDsl._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes @@ -15,6 +17,7 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource + class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable @@ -26,36 +29,56 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with val grobidPipe : TypedPipe[(String, String, String)] = grobidSource .read .fromBytesWritable(new Fields("key", "tei_json")) - .debug .toTypedPipe[(String, String)]('key, 'tei_json) .map { entry => val (key, json) = (entry._1, entry._2) HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => (key, json, slug) - case None => (key, json, NoTitle) + case Some(slug) => (slug, key, json) + case None => (NoTitle, key, json) } } .filter { entry => - val (_, _, slug) = entry - slug != NoTitle && slug.length > 0 + val (slug, _, _) = entry + slug != NoTitle } - .write(TypedTsv[(String, String, String)](args("output"))) -/* - .map('key -> 'sha1) { sha1 : String => sha1 } + val grobidGroup = grobidPipe + .groupBy { case (slug, key, json) => slug } +// .debug + + val crossrefSource = TextLine(args("crossref-input")) - val crossrefPipe = crossrefSource + val crossrefPipe : TypedPipe[(String, String)] = crossrefSource .read - .map('line -> 'slug) { - json : String => HBaseCrossrefScore.crossrefToSlug(json)} - .debug - - val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe) - innerJoinPipe - .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) { - x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)} - .write(TypedTsv[(String, String, String)](args("output"))) - */ + .toTypedPipe[String]('line) + .map{ json : String => +// val (offset, json) = entry + HBaseCrossrefScore.crossrefToSlug(json) match { + case Some(slug) => (slug, json) + case None => (NoTitle, json) + } + } + .debug + .filter { entry => + val (slug, json) = entry + slug != NoTitle + } + val crossrefGroup = crossrefPipe + .groupBy { case (slug, json) => slug } + + // TODO: Figure out which is smaller. + val theJoin : CoGrouped[String, ((String, String, String), (String, String))] = + grobidGroup.join(crossrefGroup) + + theJoin.map{ entry => + val (slug : String, + ((slug0: String, sha1 : String, grobidJson : String), + (slug1 : String, crossrefJson : String))) = entry + // TODO: For now, output it all. + (slug, slug0, slug1, sha1, grobidJson, crossrefJson)} + .write(TypedTsv[(String, String, String, String, String, String)](args("output"))) + + } object HBaseCrossrefScore { @@ -74,7 +97,7 @@ object HBaseCrossrefScore { val jsonObject = JSON.parseFull(json) if (jsonObject == None) { // Empty map for malformed JSON - Map[String, Any]("foo" -> json) + Map[String, Any]("malformed json" -> json) } else { jsonObject.get.asInstanceOf[Map[String, Any]] } @@ -95,7 +118,7 @@ object HBaseCrossrefScore { // TODO: Don't ignore titles after the first. titleToSlug(map("title").asInstanceOf[List[String]](0)) } else { - None + Some(map.keys.mkString(",")) } } diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala index 9402c0a..dc96003 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala @@ -8,7 +8,7 @@ import org.apache.hadoop.hbase.util.Bytes import org.scalatest._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode -class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { +class HBaseCrossrefScoreTest extends FlatSpec with Matchers { val GrobidString = """ { "title": "<<TITLE>>", @@ -113,7 +113,9 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") -/* + + // Unit tests + "titleToSlug()" should "extract the parts of titles before a colon" in { val slug = HBaseCrossrefScore.titleToSlug("HELLO:there") slug should contain ("hello") @@ -125,7 +127,7 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { } "grobidToSlug()" should "get the right slug for a grobid json string" in { - val slug = HBaseCrossrefScore.grobidToSlug(GrobidString) + val slug = HBaseCrossrefScore.grobidToSlug(GrobidStringWithTitle) slug should contain ("dummy example file") } @@ -140,8 +142,8 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { } "crossrefToSlug()" should "get the right slug for a crossref json string" in { - val slug = HBaseCrossrefScore.crossrefToSlug(CrossrefString) - slug should contain ("les ferments lactiques") + val slug = HBaseCrossrefScore.crossrefToSlug(CrossrefStringWithTitle) + slug should contain ("sometitle") } it should "return None if given json string without title" in { @@ -153,8 +155,9 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { val slug = HBaseCrossrefScore.grobidToSlug(MalformedCrossrefString) slug shouldBe None } - */ - + + // Pipeline tests + val output = "/tmp/testOutput" val input = "/tmp/testInput" val (testTable, testHost) = ("test-table", "dummy-host:2181") @@ -176,23 +179,22 @@ class HBaseCrossrefScoreTest extends FunSpec with TupleConversions { .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) .source(TextLine(input), List(( - "0" -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), - "1" -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))) - .sink[(String, String, String)](TypedTsv[(String, String, String)](output)) { + CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))) + .sink[(String, String, String, String, String, + String)](TypedTsv[(String, String, String, String, String, String)](output)) { outputBuffer => - it("should return a 3-element list.") { - assert(outputBuffer.size === 3) - } - it("should return the right first entry.") { - val (sha1, json, slug0) = outputBuffer(0) - assert(sha1 == new String(grobidSampleData(0)(0), "UTF-8")) - assert(json == new String(grobidSampleData(0)(1), "UTF-8")) - assert(slug0 == "title1") - } /* - it("should return the right last slug.") { - val (_, _, slug3) = outputBuffer(3) - assert(slug3 == "foo") + it should "return a 3-element list" in { + outputBuffer should have length 3 + } + it should "return the right first entry" in { + val (slug, slug0, slug1, sha1, grobidJson, crossrefJson) = outputBuffer(0) + slug shouldBe "title1" + sha1 shouldBe new String(grobidSampleData(0)(0), "UTF-8") + grobidJson shouldBe new String(grobidSampleData(0)(1), "UTF-8") } */ } |