aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseBuilder.scala14
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCountJob.scala10
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala14
4 files changed, 32 insertions, 15 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
index b271def..19df99d 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
@@ -1,9 +1,12 @@
package sandcrawler
+import scala._
+
+import cascading.tap.SinkMode
import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBaseSource
-import scala._
object HBaseBuilder {
// map from column families to column names
@@ -39,8 +42,8 @@ object HBaseBuilder {
val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))
val families = groupMap.keys.toList
val groupedColNames : List[List[String]] = families map {fam => {
- val cols = {groupMap(fam).map(v => v.split(":")(1))}
- cols}}
+ val cols = {groupMap(fam).map(v => v.split(":")(1))}
+ cols}}
(families, groupedColNames.map({fields => new Fields(fields : _*)}))
}
@@ -48,4 +51,9 @@ object HBaseBuilder {
val (families, fields) = parseColSpecs(colSpecs)
new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList)
}
+
+ def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = {
+ val (families, fields) = parseColSpecs(colSpecs)
+ new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode)
+ }
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
index 22e4e86..b12e723 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
@@ -1,9 +1,10 @@
package sandcrawler
+import java.util.Properties
+
import cascading.property.AppProps
import cascading.tuple.Fields
import com.twitter.scalding._
-import java.util.Properties
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
@@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas
HBaseBuilder.parseColSpec(colSpec)
val Col: String = colSpec.split(":")(1)
- HBaseCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"),
- colSpec)
+ HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ colSpec)
.read
.fromBytesWritable(Symbol(Col))
.debug
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index 6def218..4c3de33 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -1,9 +1,10 @@
package sandcrawler
+import java.util.Properties
+
import cascading.property.AppProps
import cascading.tuple.Fields
import com.twitter.scalding._
-import java.util.Properties
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
@@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
val output = args("output")
- HBaseRowCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"))
+ HBaseRowCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
.read
.debug
.groupAll { _.size('count) }
@@ -31,5 +33,4 @@ object HBaseRowCountJob {
List("file:size"),
SourceMode.SCAN_ALL)
}
-
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index b1dab0e..fd0b4e2 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -1,18 +1,24 @@
package sandcrawler
-import com.twitter.scalding.Args
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
- val source = HBaseCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"),
- "grobid0:status_code")
+ val source = HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ "grobid0:status_code")
val statusPipe : TypedPipe[Long] = source
.read