diff options
author | galarragas <galarragas@gmail.com> | 2014-02-24 15:21:28 +0000 |
---|---|---|
committer | galarragas <galarragas@gmail.com> | 2014-02-24 15:21:28 +0000 |
commit | 42249fad9938f5d643c53fe8d65375beed8a4de4 (patch) | |
tree | 14339e6cbd1a65a493deed10a2287e0adbfbd80c /src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala | |
parent | d09d29b7f301d8eca613dff0e62f658bcee536bc (diff) | |
download | SpyGlass-42249fad9938f5d643c53fe8d65375beed8a4de4.tar.gz SpyGlass-42249fad9938f5d643c53fe8d65375beed8a4de4.zip |
Code review. Removed some commented code and some warning
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala | 53 |
1 files changed, 5 insertions, 48 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index dc87a4b..957258c 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -57,7 +57,6 @@ case class HBaseSource( // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) - //def localScheme = new NullScheme(allFields, allFields) type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields) @@ -68,7 +67,7 @@ case class HBaseSource( case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme") } mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Hdfs(_, _) => readOrWrite match { case Read => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) @@ -120,14 +119,14 @@ case class HBaseSource( // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields new MemoryTap[InputStream, OutputStream](localScheme, buffer) }*/ - case testMode @ Test(buffer) => readOrWrite match { + case Test(buffer) => readOrWrite match { case Read => { - val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) hbt.asInstanceOf[Tap[_,_,_]] } case Write => { - val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) hbt.asInstanceOf[Tap[_,_,_]] } } @@ -138,48 +137,6 @@ case class HBaseSource( def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { - mode match { - case _ => { - throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) - } - } + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) } - - /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = { - mode match { - case Test(buffers) => { - /* - * There MUST have already been a registered sink or source in the Test mode. - * to access this. You must explicitly name each of your test sources in your - * JobTest. - */ - val buffer = - if (readOrWrite == Write) { - val buf = buffers(this) - //Make sure we wipe it out: - buf.clear() - buf - } else { - // if the source is also used as a sink, we don't want its contents to get modified - buffers(this).clone() - } - // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields - new MemoryTap[InputStream, OutputStream](localScheme, buffer) - } - case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match { - case Read => { - val buffer = buffers(this) - val fields = hdfsScheme.getSourceFields - (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]] - } - case Write => { - val path = hdfsTest.getWritePathFor(this) - castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE)) - } - } - case _ => { - throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) - } - } - } */ } |