aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
diff options
context:
space:
mode:
authorSaad Rashid <saad373@gmail.com>2014-02-21 15:23:48 +0000
committerSaad Rashid <saad373@gmail.com>2014-02-21 15:23:48 +0000
commitd09d29b7f301d8eca613dff0e62f658bcee536bc (patch)
treefddeb9fec0b948584dfaa1af3d129300b99c8e0f /src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
parente068f01c8e16d52eaf572b26d32730b119c325b5 (diff)
downloadSpyGlass-d09d29b7f301d8eca613dff0e62f658bcee536bc.tar.gz
SpyGlass-d09d29b7f301d8eca613dff0e62f658bcee536bc.zip
Added Local Schema and Memory Tap in HBaseSource and JDBCSource to fix
the failing unit tests.
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala22
1 files changed, 20 insertions, 2 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index 09f4579..2472eda 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -12,6 +12,11 @@ import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
+import com.twitter.scalding.Test
+import com.twitter.scalding.MemoryTap
+import java.io.InputStream
+import java.io.OutputStream
+import java.util.Properties
case class JDBCSource(
tableName: String = "tableName",
@@ -32,8 +37,10 @@ case class JDBCSource(
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
- def localScheme = new NullScheme(fields, fields)
-
+ //def localScheme = new NullScheme(fields, fields)
+ type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
+ def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
case jdbc: JDBCScheme => jdbc
@@ -53,6 +60,17 @@ case class JDBCSource(
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
}
+ case testMode @ Test(buffer) => readOrWrite match {
+
+ case Read => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
case _ => createEmptyTap(readOrWrite)(mode)
}
}