aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-06-01 14:54:17 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-06-01 14:54:17 -0700
commit59c2827bcd682a6c78714438ef8d7aaca2ccf13b (patch)
tree7685d226d56bc2c939665b9be6ebf3f94e2a7d63 /scalding
parent8e3a5ba6f3bdfad32e9bfefeebcb85bbe254cbc9 (diff)
downloadsandcrawler-59c2827bcd682a6c78714438ef8d7aaca2ccf13b.tar.gz
sandcrawler-59c2827bcd682a6c78714438ef8d7aaca2ccf13b.zip
Factored common code out of HBaseRowCountJob and its test into a new companion object.
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala14
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala14
2 files changed, 12 insertions, 16 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index d47fe60..79ebbb1 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -16,7 +16,15 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
val output = args("output")
- val hbs = new HBaseSource(
+ HBaseRowCountJob.getHBaseSource
+ .read
+ .debug
+ .groupAll { _.size('count) }
+ .write(Tsv(output))
+}
+
+object HBaseRowCountJob {
+ def getHBaseSource = new HBaseSource(
//"table_name",
//"quorum_name:2181",
"wbgrp-journal-extract-0-qa", // HBase Table Name
@@ -25,8 +33,4 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
List("file"),
List(new Fields("size", "mimetype")),
sourceMode = SourceMode.SCAN_ALL)
- .read
- .debug
- .groupAll { _.size('count) }
- .write(Tsv(output))
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
index 94b3740..abb017c 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
@@ -12,6 +12,7 @@ import scala._
import com.twitter.scalding.Tsv
import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import sandcrawler.HBaseRowCountJob
/**
* Example of how to define tests for HBaseSource
@@ -39,18 +40,9 @@ class HBaseRowCountTest extends FunSpec with TupleConversions {
.arg("app.conf.path", "app.conf")
.arg("output", output)
.arg("debug", "true")
- .source[Tuple](
- new HBaseSource(
- //"table_name",
- //"quorum_name:2181",
- "wbgrp-journal-extract-0-qa",
- "mtrcs-zk1.us.archive.org:2181",
- new Fields("key"),
- List("file"),
- List(new Fields("size", "mimetype")),
- sourceMode = SourceMode.SCAN_ALL),
+ .source[Tuple](HBaseRowCountJob.getHBaseSource,
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
- .sink[Tuple](Tsv(output)) {
+ .sink[Tuple](Tsv(output)) {
outputBuffer =>
it("should return the test data provided.") {