diff options
Diffstat (limited to 'scalding/src')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala | 18 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala | 124 | 
2 files changed, 142 insertions, 0 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala new file mode 100644 index 0000000..9a8d701 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -0,0 +1,18 @@ + +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GrobidScorableDumpJob(args: Args) extends JobBase(args) { + +  val sc1 : Scorable = new GrobidScorable() +  val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) + +  pipe1 +    .map { case (slug, features) => (slug, features.json) } +    .write(TypedTsv[(String, String)](args("output"))) +} diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala new file mode 100644 index 0000000..12e13dc --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala @@ -0,0 +1,124 @@ + +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class GrobidScorableDumpJobTest extends FlatSpec with Matchers { +  //scalastyle:off +  val JsonString = """ +{ +  "title": "<<TITLE>>", +  "authors": [ +    {"name": "Brewster Kahle"}, +    {"name": "J Doe"} +  ], +  "journal": { +    "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", +    "eissn": null, +    "issn": null, +    "issue": null, +    "publisher": null, +    "volume": null +  }, +  "date": "2000", +  "doi": null, +  "citations": [ +    { "authors": [{"name": "A Seaperson"}], +      "date": "2001", +      "id": "b0", +      "index": 0, +      "issue": null, +      "journal": "Letters in the Alphabet", +      "publisher": null, +      "title": "Everything is Wonderful", +      "url": null, +      "volume": "20"}, +    { "authors": [], +      "date": "2011-03-28", +      "id": "b1", +      "index": 1, +      "issue": null, +      "journal": "The Dictionary", +      "publisher": null, +      "title": "All about Facts", +      "url": null, +      "volume": "14"} +  ], +  "abstract": "Everything you ever wanted to know about nothing", +  "body": "Introduction \nEverything starts somewhere, as somebody [1]  once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", +  "acknowledgement": null, +  "annex": null +} +""" +  // scalastyle:on +  val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") +  val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") +  val MalformedJsonString = JsonString.replace("}", "") + +  //  Pipeline tests +  val output = "/tmp/testOutput" +  val input = "/tmp/testInput" +  val (testTable, testHost) = ("test-table", "dummy-host:2181") + +  val Sha1Strings : List[String] = List( +    "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q",  // good +    "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU",  // good +    "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT",  // good +    "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56",  // bad status +    "sha1:93187A85273589347598473894839443",  // malformed +    "sha1:024937534094897039547e9824382943")  // bad status + +  val JsonStrings : List[String] = List( +    JsonString.replace("<<TITLE>>", "Title 1"), +    JsonString.replace("<<TITLE>>", "Title 2: TNG"), +    JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), +    // This will have bad status. +    JsonString.replace("<<TITLE>>", "Title 1"), +    MalformedJsonString, +    // This will have bad status. +    JsonString.replace("<<TITLE>>", "Title 2") +  ) + +  // bnewbold: status codes aren't strings, they are uint64 +  val Ok : Long = 200 +  val Bad : Long = 400 +  val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad) + +  val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) +    .zipped +    .toList +    .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } +    .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + +  // scalastyle:off null +  // Add example of lines without GROBID data +  val SampleData = SampleDataHead :+ new Tuple( +    new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) +  // scalastyle:on null + +  JobTest("sandcrawler.GrobidScorableDumpJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("debug", "true") +    .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) +    .sink[(String, String)](TypedTsv[(String, String)](output)) { +      outputBuffer => +      "The pipeline" should "return correct-length list" in { +        outputBuffer should have length 3 +      } +    } +    .run +    .finish +} | 
