aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala53
1 files changed, 5 insertions, 48 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index dc87a4b..957258c 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -57,7 +57,6 @@ case class HBaseSource(
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
- //def localScheme = new NullScheme(allFields, allFields)
type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields)
@@ -68,7 +67,7 @@ case class HBaseSource(
case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")
}
mode match {
- case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Hdfs(_, _) => readOrWrite match {
case Read => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
@@ -120,14 +119,14 @@ case class HBaseSource(
// TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
new MemoryTap[InputStream, OutputStream](localScheme, buffer)
}*/
- case testMode @ Test(buffer) => readOrWrite match {
+ case Test(buffer) => readOrWrite match {
case Read => {
- val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
hbt.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
hbt.asInstanceOf[Tap[_,_,_]]
}
}
@@ -138,48 +137,6 @@ case class HBaseSource(
def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
- mode match {
- case _ => {
- throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
- }
- }
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
}
-
- /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
- mode match {
- case Test(buffers) => {
- /*
- * There MUST have already been a registered sink or source in the Test mode.
- * to access this. You must explicitly name each of your test sources in your
- * JobTest.
- */
- val buffer =
- if (readOrWrite == Write) {
- val buf = buffers(this)
- //Make sure we wipe it out:
- buf.clear()
- buf
- } else {
- // if the source is also used as a sink, we don't want its contents to get modified
- buffers(this).clone()
- }
- // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
- new MemoryTap[InputStream, OutputStream](localScheme, buffer)
- }
- case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match {
- case Read => {
- val buffer = buffers(this)
- val fields = hdfsScheme.getSourceFields
- (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]]
- }
- case Write => {
- val path = hdfsTest.getWritePathFor(this)
- castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE))
- }
- }
- case _ => {
- throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
- }
- }
- } */
}