diff options
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index 375d155..befb037 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,16 +1,17 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -import java.util.Properties -import parallelai.spyglass.base.JobBase +import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.BasePipeConversions import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Args -import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import parallelai.spyglass.hbase.HBaseSource class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { @@ -19,9 +20,10 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver HBaseBuilder.parseColSpec(colSpec) val Col: String = colSpec.split(":")(1) - val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - colSpec) + val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + colSpec) .read .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } |