aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseBuilder.scala14
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCountJob.scala10
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala14
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala22
5 files changed, 43 insertions, 26 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
index b271def..19df99d 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
@@ -1,9 +1,12 @@
package sandcrawler
+import scala._
+
+import cascading.tap.SinkMode
import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBaseSource
-import scala._
object HBaseBuilder {
// map from column families to column names
@@ -39,8 +42,8 @@ object HBaseBuilder {
val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))
val families = groupMap.keys.toList
val groupedColNames : List[List[String]] = families map {fam => {
- val cols = {groupMap(fam).map(v => v.split(":")(1))}
- cols}}
+ val cols = {groupMap(fam).map(v => v.split(":")(1))}
+ cols}}
(families, groupedColNames.map({fields => new Fields(fields : _*)}))
}
@@ -48,4 +51,9 @@ object HBaseBuilder {
val (families, fields) = parseColSpecs(colSpecs)
new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList)
}
+
+ def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = {
+ val (families, fields) = parseColSpecs(colSpecs)
+ new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode)
+ }
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
index 22e4e86..b12e723 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
@@ -1,9 +1,10 @@
package sandcrawler
+import java.util.Properties
+
import cascading.property.AppProps
import cascading.tuple.Fields
import com.twitter.scalding._
-import java.util.Properties
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
@@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas
HBaseBuilder.parseColSpec(colSpec)
val Col: String = colSpec.split(":")(1)
- HBaseCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"),
- colSpec)
+ HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ colSpec)
.read
.fromBytesWritable(Symbol(Col))
.debug
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index 6def218..4c3de33 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -1,9 +1,10 @@
package sandcrawler
+import java.util.Properties
+
import cascading.property.AppProps
import cascading.tuple.Fields
import com.twitter.scalding._
-import java.util.Properties
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
@@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
val output = args("output")
- HBaseRowCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"))
+ HBaseRowCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
.read
.debug
.groupAll { _.size('count) }
@@ -31,5 +33,4 @@ object HBaseRowCountJob {
List("file:size"),
SourceMode.SCAN_ALL)
}
-
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index b1dab0e..fd0b4e2 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -1,18 +1,24 @@
package sandcrawler
-import com.twitter.scalding.Args
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
- val source = HBaseCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"),
- "grobid0:status_code")
+ val source = HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ "grobid0:status_code")
val statusPipe : TypedPipe[Long] = source
.read
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
index 11ab1d0..d7689cd 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
@@ -48,18 +48,18 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
.arg("debug", "true")
.source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .sink[Tuple](TypedTsv[(Long, Long)](output)) {
- outputBuffer =>
- it("should return a 2-element list.") {
- assert(outputBuffer.size === 2)
- }
+ .sink[Tuple](TypedTsv[(Long, Long)](output)) {
+ outputBuffer =>
+ it("should return a 2-element list.") {
+ assert(outputBuffer.size === 2)
+ }
- // Convert List[Tuple] to Map[Long, Long].
- val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
- it("should have the appropriate number of each status type") {
- assert(counts(statusType1) == statusType1Count)
- assert(counts(statusType2) == statusType2Count)
- }
+ // Convert List[Tuple] to Map[Long, Long].
+ val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
+ it("should have the appropriate number of each status type") {
+ assert(counts(statusType1) == statusType1Count)
+ assert(counts(statusType2) == statusType2Count)
+ }
}
.run
.finish