diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc')
-rw-r--r-- | src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala | 22 |
1 files changed, 20 insertions, 2 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 09f4579..2472eda 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -12,6 +12,11 @@ import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf +import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties case class JDBCSource( tableName: String = "tableName", @@ -32,8 +37,10 @@ case class JDBCSource( .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing - def localScheme = new NullScheme(fields, fields) - + //def localScheme = new NullScheme(fields, fields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -53,6 +60,17 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + } case _ => createEmptyTap(readOrWrite)(mode) } } |