aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/HBaseBuilder.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseBuilder.scala14
1 files changed, 11 insertions, 3 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
index b271def..19df99d 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
@@ -1,9 +1,12 @@
package sandcrawler
+import scala._
+
+import cascading.tap.SinkMode
import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBaseSource
-import scala._
object HBaseBuilder {
// map from column families to column names
@@ -39,8 +42,8 @@ object HBaseBuilder {
val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))
val families = groupMap.keys.toList
val groupedColNames : List[List[String]] = families map {fam => {
- val cols = {groupMap(fam).map(v => v.split(":")(1))}
- cols}}
+ val cols = {groupMap(fam).map(v => v.split(":")(1))}
+ cols}}
(families, groupedColNames.map({fields => new Fields(fields : _*)}))
}
@@ -48,4 +51,9 @@ object HBaseBuilder {
val (families, fields) = parseColSpecs(colSpecs)
new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList)
}
+
+ def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = {
+ val (families, fields) = parseColSpecs(colSpecs)
+ new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode)
+ }
}