aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala22
1 files changed, 20 insertions, 2 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index 09f4579..2472eda 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -12,6 +12,11 @@ import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
+import com.twitter.scalding.Test
+import com.twitter.scalding.MemoryTap
+import java.io.InputStream
+import java.io.OutputStream
+import java.util.Properties
case class JDBCSource(
tableName: String = "tableName",
@@ -32,8 +37,10 @@ case class JDBCSource(
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
- def localScheme = new NullScheme(fields, fields)
-
+ //def localScheme = new NullScheme(fields, fields)
+ type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
+ def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
case jdbc: JDBCScheme => jdbc
@@ -53,6 +60,17 @@ case class JDBCSource(
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
}
+ case testMode @ Test(buffer) => readOrWrite match {
+
+ case Read => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
case _ => createEmptyTap(readOrWrite)(mode)
}
}