aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala20
1 files changed, 15 insertions, 5 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index c214e99..bbae8f6 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -7,9 +7,7 @@ import com.twitter.scalding.AccessMode
import com.twitter.scalding.Hdfs
import com.twitter.scalding.Mode
import com.twitter.scalding.Read
-import com.twitter.scalding.Source
import com.twitter.scalding.Write
-
import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
@@ -18,6 +16,8 @@ import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import com.twitter.scalding.Source
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -47,11 +47,11 @@ case class HBaseSource(
val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
internalScheme.setInputSplitTye(inputSplitType)
- override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+ val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
- override def localScheme = new NullScheme(allFields, allFields)
+ def localScheme = new NullScheme(allFields, allFields)
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val hBaseScheme = hdfsScheme match {
@@ -92,7 +92,17 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
}
- case _ => super.createTap(readOrWrite)(mode)
+ case _ => createEmptyTap(readOrWrite)(mode)
}
}
+
+
+
+ def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
+ mode match {
+ case _ => {
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
+ }
+ }
}