aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-05-29 11:01:14 -0700
committerBryan Newbold <bnewbold@archive.org>2018-05-29 11:01:14 -0700
commitc14676635f39dd1bc0345e4df2d1fa06c298bfd7 (patch)
tree4fcdfc2aa27f56973b68a9cad32f4a0120b9a756 /scalding
parent62bfc9d5f40ccd690b4b070d64d046fb6b1e3e8a (diff)
downloadsandcrawler-c14676635f39dd1bc0345e4df2d1fa06c298bfd7.tar.gz
sandcrawler-c14676635f39dd1bc0345e4df2d1fa06c298bfd7.zip
HBaseRowCountJob actually counts rows
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala13
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala8
2 files changed, 8 insertions, 13 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index 5df6b2e..98da239 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -27,15 +27,6 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
sourceMode = SourceMode.GET_LIST, keyList = List("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"))
.read
.debug
- .fromBytesWritable(new Fields("key"))
- .write(Tsv(output format "get_list"))
-
- /*
- List("column_family"),
- sourceMode = SourceMode.SCAN_ALL)
- .read
- .debug
- .fromBytesWritable(new Fields("key"))
- .write(Tsv(output format "get_list"))
- */
+ .groupAll { _.size('count) }
+ .write(Tsv(output))
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
index 598f45d..ac7cf18 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
@@ -46,11 +46,15 @@ class HBaseRowCountTest extends FunSpec with TupleConversions {
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
.sink[Tuple](Tsv(output format "get_list")) {
outputBuffer =>
- log.debug("Output => " + outputBuffer)
it("should return the test data provided.") {
println("outputBuffer.size => " + outputBuffer.size)
- assert(outputBuffer.size === 2)
+ assert(outputBuffer.size === 1)
+ }
+
+ it("should return the correct count") {
+ println("raw output => " + outputBuffer)
+ assert(outputBuffer(0).getObject(0) === 2)
}
}
.run