diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index e46ef50..39a076e 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -9,6 +9,10 @@ import com.twitter.scalding.Mode import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write + +import parallelai.spyglass.hbase.HBaseScheme; +import parallelai.spyglass.hbase.HBaseTap; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.scheme.Scheme import cascading.tap.SinkMode import cascading.tap.Tap @@ -17,7 +21,6 @@ import org.apache.hadoop.mapred.RecordReader import scala.compat.Platform import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf -import parallelai.spyglass.hbase.HBaseConstants.SourceMode object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -36,7 +39,10 @@ class HBaseSource( sourceMode: SourceMode = SourceMode.SCAN_ALL, startKey: String = null, stopKey: String = null, - keyList: List[String] = null + keyList: List[String] = null, + versions: Int = 1, + useSalt: Boolean = false, + prefixList: String = null ) extends Source { override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) @@ -51,19 +57,20 @@ class HBaseSource( case hdfsMode @ Hdfs(_, _) => readOrWrite match { case Read => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) - + sourceMode match { case SourceMode.SCAN_RANGE => { - hbt.setHBaseRangeParms(startKey, stopKey) + + hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList) } case SourceMode.SCAN_ALL => { - hbt.setHBaseScanAllParms() + hbt.setHBaseScanAllParms(useSalt, prefixList) } case SourceMode.GET_LIST => { - if( keyList == null ) + if( keyList == null ) throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) - hbt.setHBaseListParms(keyList.toArray[String]) + hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList) } case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) } @@ -73,6 +80,8 @@ class HBaseSource( case Write => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) + hbt.setUseSaltInSink(useSalt); + hbt.asInstanceOf[Tap[_,_,_]] } } |