aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/test/scala/sandcrawler/UnGrobidedDumpJobTest.scala
blob: a847ebbd2a3fadd6c15c26da5616e8f1b43b1392 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package sandcrawler

import cascading.tuple.Fields
import cascading.tuple.Tuple
import com.twitter.scalding.JobTest
import com.twitter.scalding.Tsv
import com.twitter.scalding.TupleConversions
import com.twitter.scalding.TypedTsv
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.junit.runner.RunWith
import org.scalatest.FunSpec
import org.scalatest.junit.JUnitRunner
import org.slf4j.LoggerFactory
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBaseSource
import scala._

@RunWith(classOf[JUnitRunner])
class UnGrobidedDumpJobTest extends FunSpec with TupleConversions {

  val output = "/tmp/testOutput"
  val (testTable, testHost) = ("test-table", "dummy-host:2181")

  val log = LoggerFactory.getLogger(this.getClass.getName)

  val statusCode: Long = 200
  val statusBytes = Bytes.toBytes(statusCode)

  val sampleDataGrobid : List[List[Array[Byte]]] = List(
    ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusBytes),
    ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusBytes),
    ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusBytes),
    ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusBytes),
    ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusBytes),
    ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusBytes))
      .map(pair => List(Bytes.toBytes(pair._1), pair._2))

  val sampleDataFile : List[List[Array[Byte]]] = List(
    ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
    ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""))
      .map(pair => List(Bytes.toBytes(pair._1),
                        Bytes.toBytes(pair._2),
                        Bytes.toBytes(pair._3),
                        Bytes.toBytes(pair._4)))

  JobTest("sandcrawler.UnGrobidedDumpJob")
    .arg("test", "")
    .arg("app.conf.path", "app.conf")
    .arg("output", output)
    .arg("hbase-table", testTable)
    .arg("zookeeper-hosts", testHost)
    .arg("debug", "true")
    .source[Tuple](UnGrobidedDumpJob.getHBaseColSource(testTable, testHost),
      sampleDataGrobid.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
    .source[Tuple](UnGrobidedDumpJob.getHBaseKeySource(testTable, testHost),
      sampleDataFile.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
    .sink[Tuple](TypedTsv[(String,String,String,String)](output)) {
      outputBuffer =>
      it("should return correct-length list.") {
        assert(outputBuffer.size === 2)
      }
    }
    .run
    .finish
}