aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSaad Rashid <saad373@gmail.com>2014-02-21 15:23:48 +0000
committerSaad Rashid <saad373@gmail.com>2014-02-21 15:23:48 +0000
commitd09d29b7f301d8eca613dff0e62f658bcee536bc (patch)
treefddeb9fec0b948584dfaa1af3d129300b99c8e0f
parente068f01c8e16d52eaf572b26d32730b119c325b5 (diff)
downloadSpyGlass-d09d29b7f301d8eca613dff0e62f658bcee536bc.tar.gz
SpyGlass-d09d29b7f301d8eca613dff0e62f658bcee536bc.zip
Added Local Schema and Memory Tap in HBaseSource and JDBCSource to fix
the failing unit tests.
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala100
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala5
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala22
-rw-r--r--src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala1
-rw-r--r--src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala2
5 files changed, 96 insertions, 34 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 09ad19d..dc87a4b 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -20,6 +20,10 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Source
import com.twitter.scalding.TestMode
import com.twitter.scalding.Test
+import com.twitter.scalding.MemoryTap
+import java.io.InputStream
+import java.io.OutputStream
+import java.util.Properties
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -45,7 +49,7 @@ case class HBaseSource(
sinkMode: SinkMode = SinkMode.UPDATE,
inputSplitType: SplitType = SplitType.GRANULAR
) extends Source {
-
+
val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
internalScheme.setInputSplitTye(inputSplitType)
@@ -53,8 +57,11 @@ case class HBaseSource(
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
- def localScheme = new NullScheme(allFields, allFields)
+ //def localScheme = new NullScheme(allFields, allFields)
+ type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
+ def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val hBaseScheme = hdfsScheme match {
case hbase: HBaseScheme => hbase
@@ -94,36 +101,33 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
}
- case testMode @ Test(_) => readOrWrite match {
- case Read => {
- val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
-
- sourceMode match {
- case SourceMode.SCAN_RANGE => {
-
- hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
- }
- case SourceMode.SCAN_ALL => {
- hbt.setHBaseScanAllParms(useSalt, prefixList)
- }
- case SourceMode.GET_LIST => {
- if( keyList == null )
- throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
-
- hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
- }
- case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
+ /**case Test(buffers) => {
+ /*
+ * There MUST have already been a registered sink or source in the Test mode.
+ * to access this. You must explicitly name each of your test sources in your
+ * JobTest.
+ */
+ val buffer =
+ if (readOrWrite == Write) {
+ val buf = buffers(this)
+ //Make sure we wipe it out:
+ buf.clear()
+ buf
+ } else {
+ // if the source is also used as a sink, we don't want its contents to get modified
+ buffers(this).clone()
}
-
- hbt.setInputSplitType(inputSplitType)
-
+ // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
+ new MemoryTap[InputStream, OutputStream](localScheme, buffer)
+ }*/
+ case testMode @ Test(buffer) => readOrWrite match {
+
+ case Read => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
hbt.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
-
- hbt.setUseSaltInSink(useSalt)
-
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
hbt.asInstanceOf[Tap[_,_,_]]
}
}
@@ -139,5 +143,43 @@ case class HBaseSource(
throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
}
}
- }
+ }
+
+ /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
+ mode match {
+ case Test(buffers) => {
+ /*
+ * There MUST have already been a registered sink or source in the Test mode.
+ * to access this. You must explicitly name each of your test sources in your
+ * JobTest.
+ */
+ val buffer =
+ if (readOrWrite == Write) {
+ val buf = buffers(this)
+ //Make sure we wipe it out:
+ buf.clear()
+ buf
+ } else {
+ // if the source is also used as a sink, we don't want its contents to get modified
+ buffers(this).clone()
+ }
+ // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
+ new MemoryTap[InputStream, OutputStream](localScheme, buffer)
+ }
+ case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match {
+ case Read => {
+ val buffer = buffers(this)
+ val fields = hdfsScheme.getSourceFields
+ (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]]
+ }
+ case Write => {
+ val path = hdfsTest.getWritePathFor(this)
+ castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE))
+ }
+ }
+ case _ => {
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
+ }
+ } */
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
index 2921df8..6e56c52 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
@@ -20,8 +20,8 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC
val output = args("output")
- val properties = new Properties()
- AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] );
+// val properties = new Properties()
+// AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] );
val hbs = new HBaseSource(
"table_name",
@@ -31,6 +31,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC
List(new Fields("column_name1", "column_name2")),
sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3"))
.read
+ .debug
.fromBytesWritable(new Fields("key", "column_name1", "column_name2"))
.write(Tsv(output format "get_list"))
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index 09f4579..2472eda 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -12,6 +12,11 @@ import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
+import com.twitter.scalding.Test
+import com.twitter.scalding.MemoryTap
+import java.io.InputStream
+import java.io.OutputStream
+import java.util.Properties
case class JDBCSource(
tableName: String = "tableName",
@@ -32,8 +37,10 @@ case class JDBCSource(
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
- def localScheme = new NullScheme(fields, fields)
-
+ //def localScheme = new NullScheme(fields, fields)
+ type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
+ def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
case jdbc: JDBCScheme => jdbc
@@ -53,6 +60,17 @@ case class JDBCSource(
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
}
+ case testMode @ Test(buffer) => readOrWrite match {
+
+ case Read => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
case _ => createEmptyTap(readOrWrite)(mode)
}
}
diff --git a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala
index f25e769..30342da 100644
--- a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala
+++ b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala
@@ -48,6 +48,7 @@ class SimpleHBaseSourceExampleTest extends FunSpec with TupleConversions {
log.debug("Output => " + outputBuffer)
it("should return the test data provided.") {
+ println("outputBuffer.size => " + outputBuffer.size)
assert(outputBuffer.size === 3)
}
}
diff --git a/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala
index 98f275d..b268fa1 100644
--- a/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala
+++ b/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala
@@ -24,7 +24,7 @@ class JdbcSourceExampleTest extends FunSpec with TupleConversions {
)
JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample")
- .arg("local", "")
+ .arg("test", "")
.arg("app.conf.path", "app.conf")
.arg("output", output)
.arg("debug", "true")