aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala14
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala14
2 files changed, 12 insertions, 16 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index d47fe60..79ebbb1 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -16,7 +16,15 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
val output = args("output")
- val hbs = new HBaseSource(
+ HBaseRowCountJob.getHBaseSource
+ .read
+ .debug
+ .groupAll { _.size('count) }
+ .write(Tsv(output))
+}
+
+object HBaseRowCountJob {
+ def getHBaseSource = new HBaseSource(
//"table_name",
//"quorum_name:2181",
"wbgrp-journal-extract-0-qa", // HBase Table Name
@@ -25,8 +33,4 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
List("file"),
List(new Fields("size", "mimetype")),
sourceMode = SourceMode.SCAN_ALL)
- .read
- .debug
- .groupAll { _.size('count) }
- .write(Tsv(output))
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
index 94b3740..abb017c 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
@@ -12,6 +12,7 @@ import scala._
import com.twitter.scalding.Tsv
import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import sandcrawler.HBaseRowCountJob
/**
* Example of how to define tests for HBaseSource
@@ -39,18 +40,9 @@ class HBaseRowCountTest extends FunSpec with TupleConversions {
.arg("app.conf.path", "app.conf")
.arg("output", output)
.arg("debug", "true")
- .source[Tuple](
- new HBaseSource(
- //"table_name",
- //"quorum_name:2181",
- "wbgrp-journal-extract-0-qa",
- "mtrcs-zk1.us.archive.org:2181",
- new Fields("key"),
- List("file"),
- List(new Fields("size", "mimetype")),
- sourceMode = SourceMode.SCAN_ALL),
+ .source[Tuple](HBaseRowCountJob.getHBaseSource,
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
- .sink[Tuple](Tsv(output)) {
+ .sink[Tuple](Tsv(output)) {
outputBuffer =>
it("should return the test data provided.") {