aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala23
1 files changed, 12 insertions, 11 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 39a076e..d6795aa 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -10,15 +10,12 @@ import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
-import parallelai.spyglass.hbase.HBaseScheme;
-import parallelai.spyglass.hbase.HBaseTap;
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-import cascading.scheme.Scheme
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
-import scala.compat.Platform
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
@@ -29,13 +26,13 @@ object Conversions {
implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))
}
-class HBaseSource(
+case class HBaseSource(
tableName: String = null,
quorumNames: String = "localhost",
keyFields: Fields = null,
- familyNames: Array[String] = null,
- valueFields: Array[Fields] = null,
- timestamp: Long = Platform.currentTime,
+ familyNames: List[String] = null,
+ valueFields: List[Fields] = null,
+ timestamp: Long = 0L,
sourceMode: SourceMode = SourceMode.SCAN_ALL,
startKey: String = null,
stopKey: String = null,
@@ -45,9 +42,13 @@ class HBaseSource(
prefixList: String = null
) extends Source {
- override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields)
+ override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+ // To enable local mode testing
+ val allFields = keyFields.append(valueFields.toArray)
+ override def localScheme = new NullScheme(allFields, allFields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val hBaseScheme = hdfsScheme match {
case hbase: HBaseScheme => hbase
@@ -80,7 +81,7 @@ class HBaseSource(
case Write => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
- hbt.setUseSaltInSink(useSalt);
+ hbt.setUseSaltInSink(useSalt)
hbt.asInstanceOf[Tap[_,_,_]]
}