aboutsummaryrefslogtreecommitdiffstats
path: root/scald-mvp/src
diff options
context:
space:
mode:
Diffstat (limited to 'scald-mvp/src')
-rw-r--r--scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala21
1 files changed, 11 insertions, 10 deletions
diff --git a/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index 734abaa..23c4764 100644
--- a/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -2,22 +2,23 @@ package sandcrawler
import com.twitter.scalding._
import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions, HBaseConstants}
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import cascading.tuple.Fields
-class HBaseRowCountJob(args: Args) extends Job(args) {
+class HBaseRowCountJob(args: Args) extends Job(args) with HBasePipeConversions {
// For now doesn't actually count, just dumps a "word count"
+ val output = args("output")
+
val hbs = new HBaseSource(
"wbgrp-journal-extract-0-qa", // HBase Table Name
"mtrcs-zk1.us.archive.org:2181", // HBase Zookeeper server (to get runtime config info; can be array?)
- 'key, // ... then a list of column names
- sourceMode = HBaseConstants.SourceMode.SCAN_ALL)
-/*
+ new Fields("key"),
+ List("column_family"),
+ sourceMode = SourceMode.SCAN_ALL)
.read
- .map { word => (word, 1L) }
- .sumByKey
- .write(TypedTsv[(String, Long)](args("output")))
- // The compiler will enforce the type coming out of the sumByKey is the same as the type we have for our sink
- .flatMap { line => line.split("\\s+") }
-*/
+ .debug
+ .fromBytesWritable(new Fields("key"))
+ .write(Tsv(output format "get_list"))
}