diff options
author | Saad Rashid <saad373@gmail.com> | 2014-02-21 15:23:48 +0000 |
---|---|---|
committer | Saad Rashid <saad373@gmail.com> | 2014-02-21 15:23:48 +0000 |
commit | d09d29b7f301d8eca613dff0e62f658bcee536bc (patch) | |
tree | fddeb9fec0b948584dfaa1af3d129300b99c8e0f /src/main/scala/parallelai/spyglass/jdbc | |
parent | e068f01c8e16d52eaf572b26d32730b119c325b5 (diff) | |
download | SpyGlass-d09d29b7f301d8eca613dff0e62f658bcee536bc.tar.gz SpyGlass-d09d29b7f301d8eca613dff0e62f658bcee536bc.zip |
Added Local Schema and Memory Tap in HBaseSource and JDBCSource to fix
the failing unit tests.
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc')
-rw-r--r-- | src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala | 22 |
1 files changed, 20 insertions, 2 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 09f4579..2472eda 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -12,6 +12,11 @@ import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf +import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties case class JDBCSource( tableName: String = "tableName", @@ -32,8 +37,10 @@ case class JDBCSource( .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing - def localScheme = new NullScheme(fields, fields) - + //def localScheme = new NullScheme(fields, fields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -53,6 +60,17 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + } case _ => createEmptyTap(readOrWrite)(mode) } } |