diff options
Diffstat (limited to 'scald-mvp')
| -rw-r--r-- | scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala | 21 | 
1 files changed, 11 insertions, 10 deletions
| diff --git a/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 734abaa..23c4764 100644 --- a/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -2,22 +2,23 @@ package sandcrawler  import com.twitter.scalding._  import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions, HBaseConstants} +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import cascading.tuple.Fields -class HBaseRowCountJob(args: Args) extends Job(args) { +class HBaseRowCountJob(args: Args) extends Job(args) with HBasePipeConversions {    // For now doesn't actually count, just dumps a "word count" +  val output = args("output") +    val hbs = new HBaseSource(      "wbgrp-journal-extract-0-qa",     // HBase Table Name      "mtrcs-zk1.us.archive.org:2181",  // HBase Zookeeper server (to get runtime config info; can be array?) -    'key,                             // ... then a list of column names -    sourceMode = HBaseConstants.SourceMode.SCAN_ALL) -/* +     new Fields("key"), +     List("column_family"), +    sourceMode = SourceMode.SCAN_ALL)      .read -    .map { word => (word, 1L) } -    .sumByKey -    .write(TypedTsv[(String, Long)](args("output"))) -    // The compiler will enforce the type coming out of the sumByKey is the same as the type we have for our sink -    .flatMap { line => line.split("\\s+") } -*/ +    .debug +    .fromBytesWritable(new Fields("key")) +    .write(Tsv(output format "get_list"))  } | 
