diff options
Diffstat (limited to 'scalding')
3 files changed, 39 insertions, 20 deletions
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 5ba7d58..c319fe6 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -11,6 +11,8 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { + val StatusOK = 200 + def getSource(args : Args) : Source = { // TODO: Generalize args so there can be multiple grobid pipes in one job. GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) @@ -19,15 +21,17 @@ class GrobidScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { getSource(args) .read - .fromBytesWritable(new Fields("key", "tei_json")) - .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .fromBytesWritable(new Fields("key", "tei_json", "status_code")) + .toTypedPipe[(String, String, Int)](new Fields("key", "tei_json", "status_code")) + // TODO: Should I combine next two stages for efficiency? + .collect { case (key, json, StatusOK) => (key, json) } .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } } } object GrobidScorable { def getHBaseSource(table : String, host : String) : HBaseSource = { - HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL) + HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL) } def jsonToMapFeatures(key : String, json : String) : MapFeatures = { diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index d7689cd..8a71f31 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -25,7 +25,7 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { val statusType1Bytes = Bytes.toBytes(statusType1) val statusType2Bytes = Bytes.toBytes(statusType2) - val sampleData = List( + val sampleData : List[List[Array[Byte]]] = List( List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes), List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes), List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes), diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala index f0b411f..e72eb7a 100644 --- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -9,7 +9,7 @@ import org.scalatest._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode class ScoreJobTest extends FlatSpec with Matchers { - val GrobidString = """ + val JsonString = """ { "title": "<<TITLE>>", "authors": [ @@ -54,9 +54,9 @@ class ScoreJobTest extends FlatSpec with Matchers { "annex": null } """ - val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") - val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") - val MalformedGrobidString = GrobidString.replace("}", "") + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") val CrossrefString = """ @@ -124,21 +124,36 @@ class ScoreJobTest extends FlatSpec with Matchers { val input = "/tmp/testInput" val (testTable, testHost) = ("test-table", "dummy-host:2181") - val Sha1Strings = List( + val Sha1Strings : List[String] = List( "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", - "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56") + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", + "sha1:93187A85273589347598473894839443", + "sha1:024937534094897039547e9824382943") - val GrobidStrings = List( - GrobidString.replace("<<TITLE>>", "Title 1"), - GrobidString.replace("<<TITLE>>", "Title 2: TNG"), - GrobidString.replace("<<TITLE>>", "Title 3: The Sequel"), - MalformedGrobidString) + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2") + ) - val GrobidSampleData = (Sha1Strings zip GrobidStrings) - .map{case(s, g) => - List(Bytes.toBytes(s), Bytes.toBytes(g))} + val Ok = Bytes.toBytes("200") + val Bad = Bytes.toBytes("404") + + val SampleData : List[List[Array[Byte]]] = List( + List(Bytes.toBytes(Sha1Strings(0)), Bytes.toBytes(JsonStrings(0)), Ok), + List(Bytes.toBytes(Sha1Strings(1)), Bytes.toBytes(JsonStrings(1)), Ok), + List(Bytes.toBytes(Sha1Strings(2)), Bytes.toBytes(JsonStrings(2)), Ok), + List(Bytes.toBytes(Sha1Strings(3)), Bytes.toBytes(JsonStrings(3)), Bad), + List(Bytes.toBytes(Sha1Strings(4)), Bytes.toBytes(JsonStrings(4)), Ok), + List(Bytes.toBytes(Sha1Strings(5)), Bytes.toBytes(JsonStrings(5)), Bad) + ) JobTest("sandcrawler.ScoreJob") .arg("test", "") @@ -149,7 +164,7 @@ class ScoreJobTest extends FlatSpec with Matchers { .arg("crossref-input", input) .arg("debug", "true") .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), - GrobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + SampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) .source(TextLine(input), List( 0 -> CrossrefStrings(0), 1 -> CrossrefStrings(1), @@ -181,7 +196,7 @@ class ScoreJobTest extends FlatSpec with Matchers { def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) = { val mf1 : MapFeatures = GrobidScorable.jsonToMapFeatures( Sha1Strings(grobidIndex), - GrobidStrings(grobidIndex)) + JsonStrings(grobidIndex)) val mf2 : MapFeatures = CrossrefScorable.jsonToMapFeatures( CrossrefStrings(crossrefIndex)) val score = Scorable.computeSimilarity( |