From d09d29b7f301d8eca613dff0e62f658bcee536bc Mon Sep 17 00:00:00 2001 From: Saad Rashid Date: Fri, 21 Feb 2014 15:23:48 +0000 Subject: Added Local Schema and Memory Tap in HBaseSource and JDBCSource to fix the failing unit tests. --- .../parallelai/spyglass/hbase/HBaseSource.scala | 100 +++++++++++++++------ .../hbase/example/SimpleHBaseSourceExample.scala | 5 +- .../parallelai/spyglass/jdbc/JDBCSource.scala | 22 ++++- .../example/SimpleHBaseSourceExampleTest.scala | 1 + .../jdbc/example/JdbcSourceExampleTest.scala | 2 +- 5 files changed, 96 insertions(+), 34 deletions(-) diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 09ad19d..dc87a4b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -20,6 +20,10 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import com.twitter.scalding.Source import com.twitter.scalding.TestMode import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -45,7 +49,7 @@ case class HBaseSource( sinkMode: SinkMode = SinkMode.UPDATE, inputSplitType: SplitType = SplitType.GRANULAR ) extends Source { - + val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) internalScheme.setInputSplitTye(inputSplitType) @@ -53,8 +57,11 @@ case class HBaseSource( // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) - def localScheme = new NullScheme(allFields, allFields) + //def localScheme = new NullScheme(allFields, allFields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val hBaseScheme = hdfsScheme match { case hbase: HBaseScheme => hbase @@ -94,36 +101,33 @@ case class HBaseSource( hbt.asInstanceOf[Tap[_,_,_]] } } - case testMode @ Test(_) => readOrWrite match { - case Read => { - val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) - - sourceMode match { - case SourceMode.SCAN_RANGE => { - - hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList) - } - case SourceMode.SCAN_ALL => { - hbt.setHBaseScanAllParms(useSalt, prefixList) - } - case SourceMode.GET_LIST => { - if( keyList == null ) - throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) - - hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList) - } - case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) + /**case Test(buffers) => { + /* + * There MUST have already been a registered sink or source in the Test mode. + * to access this. You must explicitly name each of your test sources in your + * JobTest. + */ + val buffer = + if (readOrWrite == Write) { + val buf = buffers(this) + //Make sure we wipe it out: + buf.clear() + buf + } else { + // if the source is also used as a sink, we don't want its contents to get modified + buffers(this).clone() } - - hbt.setInputSplitType(inputSplitType) - + // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields + new MemoryTap[InputStream, OutputStream](localScheme, buffer) + }*/ + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) hbt.asInstanceOf[Tap[_,_,_]] } case Write => { - val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) - - hbt.setUseSaltInSink(useSalt) - + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) hbt.asInstanceOf[Tap[_,_,_]] } } @@ -139,5 +143,43 @@ case class HBaseSource( throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) } } - } + } + + /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = { + mode match { + case Test(buffers) => { + /* + * There MUST have already been a registered sink or source in the Test mode. + * to access this. You must explicitly name each of your test sources in your + * JobTest. + */ + val buffer = + if (readOrWrite == Write) { + val buf = buffers(this) + //Make sure we wipe it out: + buf.clear() + buf + } else { + // if the source is also used as a sink, we don't want its contents to get modified + buffers(this).clone() + } + // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields + new MemoryTap[InputStream, OutputStream](localScheme, buffer) + } + case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match { + case Read => { + val buffer = buffers(this) + val fields = hdfsScheme.getSourceFields + (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]] + } + case Write => { + val path = hdfsTest.getWritePathFor(this) + castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE)) + } + } + case _ => { + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) + } + } + } */ } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala index 2921df8..6e56c52 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -20,8 +20,8 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC val output = args("output") - val properties = new Properties() - AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] ); +// val properties = new Properties() +// AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] ); val hbs = new HBaseSource( "table_name", @@ -31,6 +31,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC List(new Fields("column_name1", "column_name2")), sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")) .read + .debug .fromBytesWritable(new Fields("key", "column_name1", "column_name2")) .write(Tsv(output format "get_list")) diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 09f4579..2472eda 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -12,6 +12,11 @@ import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf +import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties case class JDBCSource( tableName: String = "tableName", @@ -32,8 +37,10 @@ case class JDBCSource( .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing - def localScheme = new NullScheme(fields, fields) - + //def localScheme = new NullScheme(fields, fields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -53,6 +60,17 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + } case _ => createEmptyTap(readOrWrite)(mode) } } diff --git a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala index f25e769..30342da 100644 --- a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala +++ b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala @@ -48,6 +48,7 @@ class SimpleHBaseSourceExampleTest extends FunSpec with TupleConversions { log.debug("Output => " + outputBuffer) it("should return the test data provided.") { + println("outputBuffer.size => " + outputBuffer.size) assert(outputBuffer.size === 3) } } diff --git a/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala index 98f275d..b268fa1 100644 --- a/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala +++ b/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala @@ -24,7 +24,7 @@ class JdbcSourceExampleTest extends FunSpec with TupleConversions { ) JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample") - .arg("local", "") + .arg("test", "") .arg("app.conf.path", "app.conf") .arg("output", output) .arg("debug", "true") -- cgit v1.2.3