diff options
Diffstat (limited to 'scalding/src/main')
4 files changed, 27 insertions, 17 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index fd04f2e..431860c 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -1,11 +1,12 @@  package sandcrawler -import cascading.tuple.Fields -import parallelai.spyglass.base.JobBase +import scala._ +  import cascading.tap.SinkMode +import cascading.tuple.Fields  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBaseSource -import scala._ +import parallelai.spyglass.base.JobBase  object HBaseBuilder {    // map from column families to column names @@ -41,8 +42,8 @@ object HBaseBuilder {      val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))      val families = groupMap.keys.toList      val groupedColNames : List[List[String]] = families map {fam => { -        val cols = {groupMap(fam).map(v => v.split(":")(1))} -        cols}} +      val cols = {groupMap(fam).map(v => v.split(":")(1))} +      cols}}      (families, groupedColNames.map({fields => new Fields(fields : _*)}))    } diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index 22e4e86..b12e723 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -1,9 +1,10 @@  package sandcrawler +import java.util.Properties +  import cascading.property.AppProps  import cascading.tuple.Fields  import com.twitter.scalding._ -import java.util.Properties  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions @@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas    HBaseBuilder.parseColSpec(colSpec)    val Col: String = colSpec.split(":")(1) -  HBaseCountJob.getHBaseSource(args("hbase-table"), -                               args("zookeeper-hosts"), -                               colSpec) +  HBaseCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    colSpec)      .read      .fromBytesWritable(Symbol(Col))      .debug diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 6def218..4c3de33 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -1,9 +1,10 @@  package sandcrawler +import java.util.Properties +  import cascading.property.AppProps  import cascading.tuple.Fields  import com.twitter.scalding._ -import java.util.Properties  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions @@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio    val output = args("output") -  HBaseRowCountJob.getHBaseSource(args("hbase-table"), -                                  args("zookeeper-hosts")) +  HBaseRowCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"))      .read      .debug      .groupAll { _.size('count) } @@ -31,5 +33,4 @@ object HBaseRowCountJob {        List("file:size"),        SourceMode.SCAN_ALL)    } -  } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index b1dab0e..fd0b4e2 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,18 +1,24 @@  package sandcrawler -import com.twitter.scalding.Args +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields  import com.twitter.scalding._  import com.twitter.scalding.typed.TDsl._  import org.apache.hadoop.hbase.io.ImmutableBytesWritable  import org.apache.hadoop.hbase.util.Bytes  import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource  class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { -  val source = HBaseCountJob.getHBaseSource(args("hbase-table"), -                                            args("zookeeper-hosts"), -                                            "grobid0:status_code") +  val source = HBaseCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    "grobid0:status_code")    val statusPipe : TypedPipe[Long] = source      .read | 
