From e906399e4bb54bfe5a2124bd13aa78733bcac03b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 21 May 2018 18:51:46 -0700 Subject: update HBaseRowCountJob based on Simple example --- .../main/scala/sandcrawler/HBaseRowCountJob.scala | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 734abaa..23c4764 100644 --- a/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scald-mvp/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -2,22 +2,23 @@ package sandcrawler import com.twitter.scalding._ import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions, HBaseConstants} +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import cascading.tuple.Fields -class HBaseRowCountJob(args: Args) extends Job(args) { +class HBaseRowCountJob(args: Args) extends Job(args) with HBasePipeConversions { // For now doesn't actually count, just dumps a "word count" + val output = args("output") + val hbs = new HBaseSource( "wbgrp-journal-extract-0-qa", // HBase Table Name "mtrcs-zk1.us.archive.org:2181", // HBase Zookeeper server (to get runtime config info; can be array?) - 'key, // ... then a list of column names - sourceMode = HBaseConstants.SourceMode.SCAN_ALL) -/* + new Fields("key"), + List("column_family"), + sourceMode = SourceMode.SCAN_ALL) .read - .map { word => (word, 1L) } - .sumByKey - .write(TypedTsv[(String, Long)](args("output"))) - // The compiler will enforce the type coming out of the sumByKey is the same as the type we have for our sink - .flatMap { line => line.split("\\s+") } -*/ + .debug + .fromBytesWritable(new Fields("key")) + .write(Tsv(output format "get_list")) } -- cgit v1.2.3