From d09d29b7f301d8eca613dff0e62f658bcee536bc Mon Sep 17 00:00:00 2001 From: Saad Rashid Date: Fri, 21 Feb 2014 15:23:48 +0000 Subject: Added Local Schema and Memory Tap in HBaseSource and JDBCSource to fix the failing unit tests. --- .../parallelai/spyglass/jdbc/JDBCSource.scala | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala') diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 09f4579..2472eda 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -12,6 +12,11 @@ import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf +import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties case class JDBCSource( tableName: String = "tableName", @@ -32,8 +37,10 @@ case class JDBCSource( .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing - def localScheme = new NullScheme(fields, fields) - + //def localScheme = new NullScheme(fields, fields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -53,6 +60,17 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + } case _ => createEmptyTap(readOrWrite)(mode) } } -- cgit v1.2.3