aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala15
1 files changed, 10 insertions, 5 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 7ff7860..c214e99 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -10,7 +10,7 @@ import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
@@ -40,11 +40,14 @@ case class HBaseSource(
versions: Int = 1,
useSalt: Boolean = false,
prefixList: String = null,
- sinkMode: SinkMode = SinkMode.UPDATE
+ sinkMode: SinkMode = SinkMode.UPDATE,
+ inputSplitType: SplitType = SplitType.GRANULAR
) extends Source {
-
- override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
- .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
+ internalScheme.setInputSplitTye(inputSplitType)
+
+ override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
@@ -76,6 +79,8 @@ case class HBaseSource(
}
case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
}
+
+ hbt.setInputSplitType(inputSplitType)
hbt.asInstanceOf[Tap[_,_,_]]
}