diff options
Diffstat (limited to 'scalding/src')
5 files changed, 43 insertions, 26 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index b271def..19df99d 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -1,9 +1,12 @@ package sandcrawler +import scala._ + +import cascading.tap.SinkMode import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBaseSource -import scala._ object HBaseBuilder { // map from column families to column names @@ -39,8 +42,8 @@ object HBaseBuilder { val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0)) val families = groupMap.keys.toList val groupedColNames : List[List[String]] = families map {fam => { - val cols = {groupMap(fam).map(v => v.split(":")(1))} - cols}} + val cols = {groupMap(fam).map(v => v.split(":")(1))} + cols}} (families, groupedColNames.map({fields => new Fields(fields : _*)})) } @@ -48,4 +51,9 @@ object HBaseBuilder { val (families, fields) = parseColSpecs(colSpecs) new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList) } + + def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = { + val (families, fields) = parseColSpecs(colSpecs) + new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode) + } } diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index 22e4e86..b12e723 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -1,9 +1,10 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ -import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions @@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas HBaseBuilder.parseColSpec(colSpec) val Col: String = colSpec.split(":")(1) - HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - colSpec) + HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + colSpec) .read .fromBytesWritable(Symbol(Col)) .debug diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 6def218..4c3de33 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -1,9 +1,10 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ -import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions @@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio val output = args("output") - HBaseRowCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts")) + HBaseRowCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) .read .debug .groupAll { _.size('count) } @@ -31,5 +33,4 @@ object HBaseRowCountJob { List("file:size"), SourceMode.SCAN_ALL) } - } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index b1dab0e..fd0b4e2 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,18 +1,24 @@ package sandcrawler -import com.twitter.scalding.Args +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val source = HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - "grobid0:status_code") + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") val statusPipe : TypedPipe[Long] = source .read diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 11ab1d0..d7689cd 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -48,18 +48,18 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { .arg("debug", "true") .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .sink[Tuple](TypedTsv[(Long, Long)](output)) { - outputBuffer => - it("should return a 2-element list.") { - assert(outputBuffer.size === 2) - } + .sink[Tuple](TypedTsv[(Long, Long)](output)) { + outputBuffer => + it("should return a 2-element list.") { + assert(outputBuffer.size === 2) + } - // Convert List[Tuple] to Map[Long, Long]. - val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap - it("should have the appropriate number of each status type") { - assert(counts(statusType1) == statusType1Count) - assert(counts(statusType2) == statusType2Count) - } + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + assert(counts(statusType1) == statusType1Count) + assert(counts(statusType2) == statusType2Count) + } } .run .finish |