aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala23
1 files changed, 16 insertions, 7 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index e46ef50..39a076e 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -9,6 +9,10 @@ import com.twitter.scalding.Mode
import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
+
+import parallelai.spyglass.hbase.HBaseScheme;
+import parallelai.spyglass.hbase.HBaseTap;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.scheme.Scheme
import cascading.tap.SinkMode
import cascading.tap.Tap
@@ -17,7 +21,6 @@ import org.apache.hadoop.mapred.RecordReader
import scala.compat.Platform
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -36,7 +39,10 @@ class HBaseSource(
sourceMode: SourceMode = SourceMode.SCAN_ALL,
startKey: String = null,
stopKey: String = null,
- keyList: List[String] = null
+ keyList: List[String] = null,
+ versions: Int = 1,
+ useSalt: Boolean = false,
+ prefixList: String = null
) extends Source {
override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields)
@@ -51,19 +57,20 @@ class HBaseSource(
case hdfsMode @ Hdfs(_, _) => readOrWrite match {
case Read => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
-
+
sourceMode match {
case SourceMode.SCAN_RANGE => {
- hbt.setHBaseRangeParms(startKey, stopKey)
+
+ hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
}
case SourceMode.SCAN_ALL => {
- hbt.setHBaseScanAllParms()
+ hbt.setHBaseScanAllParms(useSalt, prefixList)
}
case SourceMode.GET_LIST => {
- if( keyList == null )
+ if( keyList == null )
throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
- hbt.setHBaseListParms(keyList.toArray[String])
+ hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
}
case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
}
@@ -73,6 +80,8 @@ class HBaseSource(
case Write => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
+ hbt.setUseSaltInSink(useSalt);
+
hbt.asInstanceOf[Tap[_,_,_]]
}
}