diff options
Diffstat (limited to 'scalding/src/main')
4 files changed, 27 insertions, 20 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index b271def..b4ade24 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -39,8 +39,8 @@ object HBaseBuilder {      val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))      val families = groupMap.keys.toList      val groupedColNames : List[List[String]] = families map {fam => { -        val cols = {groupMap(fam).map(v => v.split(":")(1))} -        cols}} +      val cols = {groupMap(fam).map(v => v.split(":")(1))} +      cols}}      (families, groupedColNames.map({fields => new Fields(fields : _*)}))    } diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index 1ebc261..b12e723 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -1,21 +1,24 @@  package sandcrawler +import java.util.Properties +  import cascading.property.AppProps  import cascading.tuple.Fields  import com.twitter.scalding._ -import java.util.Properties  import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}  import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource  class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBasePipeConversions {    val output = args("output")    HBaseBuilder.parseColSpec(colSpec)    val Col: String = colSpec.split(":")(1) -  HBaseCountJob.getHBaseSource(args("hbase-table"), -                               args("zookeeper-hosts"), -                               colSpec) +  HBaseCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    colSpec)      .read      .fromBytesWritable(Symbol(Col))      .debug diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index ba3b9cd..4c3de33 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -1,19 +1,22 @@  package sandcrawler +import java.util.Properties +  import cascading.property.AppProps  import cascading.tuple.Fields  import com.twitter.scalding._ -import java.util.Properties  import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}  import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource  class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {    val output = args("output") -  HBaseRowCountJob.getHBaseSource(args("hbase-table"), -                                  args("zookeeper-hosts")) +  HBaseRowCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"))      .read      .debug      .groupAll { _.size('count) } @@ -30,5 +33,4 @@ object HBaseRowCountJob {        List("file:size"),        SourceMode.SCAN_ALL)    } -  } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index 375d155..befb037 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,16 +1,17 @@  package sandcrawler +import java.util.Properties +  import cascading.property.AppProps  import cascading.tuple.Fields  import com.twitter.scalding._  import com.twitter.scalding.typed.TDsl._ -import java.util.Properties -import parallelai.spyglass.base.JobBase +import org.apache.hadoop.hbase.io.ImmutableBytesWritable  import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.BasePipeConversions  import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Args -import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import parallelai.spyglass.hbase.HBaseSource  class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { @@ -19,9 +20,10 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver    HBaseBuilder.parseColSpec(colSpec)    val Col: String = colSpec.split(":")(1) -  val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource(args("hbase-table"), -                                                              args("zookeeper-hosts"), -                                                              colSpec) +  val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    colSpec)      .read      .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)      .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } | 
