aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala35
1 files changed, 35 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index bbae8f6..09ad19d 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -18,6 +18,8 @@ import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Source
+import com.twitter.scalding.TestMode
+import com.twitter.scalding.Test
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -92,6 +94,39 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
}
+ case testMode @ Test(_) => readOrWrite match {
+ case Read => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
+
+ sourceMode match {
+ case SourceMode.SCAN_RANGE => {
+
+ hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
+ }
+ case SourceMode.SCAN_ALL => {
+ hbt.setHBaseScanAllParms(useSalt, prefixList)
+ }
+ case SourceMode.GET_LIST => {
+ if( keyList == null )
+ throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
+
+ hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
+ }
+ case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
+ }
+
+ hbt.setInputSplitType(inputSplitType)
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
+
+ hbt.setUseSaltInSink(useSalt)
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
case _ => createEmptyTap(readOrWrite)(mode)
}
}