diff options
Diffstat (limited to 'scalding/src/main/scala')
4 files changed, 27 insertions, 17 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index fd04f2e..431860c 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -1,11 +1,12 @@ package sandcrawler -import cascading.tuple.Fields -import parallelai.spyglass.base.JobBase +import scala._ + import cascading.tap.SinkMode +import cascading.tuple.Fields import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBaseSource -import scala._ +import parallelai.spyglass.base.JobBase object HBaseBuilder { // map from column families to column names @@ -41,8 +42,8 @@ object HBaseBuilder { val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0)) val families = groupMap.keys.toList val groupedColNames : List[List[String]] = families map {fam => { - val cols = {groupMap(fam).map(v => v.split(":")(1))} - cols}} + val cols = {groupMap(fam).map(v => v.split(":")(1))} + cols}} (families, groupedColNames.map({fields => new Fields(fields : _*)})) } diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index 22e4e86..b12e723 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -1,9 +1,10 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ -import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions @@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas HBaseBuilder.parseColSpec(colSpec) val Col: String = colSpec.split(":")(1) - HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - colSpec) + HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + colSpec) .read .fromBytesWritable(Symbol(Col)) .debug diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 6def218..4c3de33 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -1,9 +1,10 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ -import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions @@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio val output = args("output") - HBaseRowCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts")) + HBaseRowCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) .read .debug .groupAll { _.size('count) } @@ -31,5 +33,4 @@ object HBaseRowCountJob { List("file:size"), SourceMode.SCAN_ALL) } - } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index b1dab0e..fd0b4e2 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,18 +1,24 @@ package sandcrawler -import com.twitter.scalding.Args +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val source = HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - "grobid0:status_code") + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") val statusPipe : TypedPipe[Long] = source .read |