aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala100
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala5
2 files changed, 74 insertions, 31 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 09ad19d..dc87a4b 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -20,6 +20,10 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Source
import com.twitter.scalding.TestMode
import com.twitter.scalding.Test
+import com.twitter.scalding.MemoryTap
+import java.io.InputStream
+import java.io.OutputStream
+import java.util.Properties
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -45,7 +49,7 @@ case class HBaseSource(
sinkMode: SinkMode = SinkMode.UPDATE,
inputSplitType: SplitType = SplitType.GRANULAR
) extends Source {
-
+
val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
internalScheme.setInputSplitTye(inputSplitType)
@@ -53,8 +57,11 @@ case class HBaseSource(
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
- def localScheme = new NullScheme(allFields, allFields)
+ //def localScheme = new NullScheme(allFields, allFields)
+ type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
+ def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val hBaseScheme = hdfsScheme match {
case hbase: HBaseScheme => hbase
@@ -94,36 +101,33 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
}
- case testMode @ Test(_) => readOrWrite match {
- case Read => {
- val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
-
- sourceMode match {
- case SourceMode.SCAN_RANGE => {
-
- hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
- }
- case SourceMode.SCAN_ALL => {
- hbt.setHBaseScanAllParms(useSalt, prefixList)
- }
- case SourceMode.GET_LIST => {
- if( keyList == null )
- throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
-
- hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
- }
- case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
+ /**case Test(buffers) => {
+ /*
+ * There MUST have already been a registered sink or source in the Test mode.
+ * to access this. You must explicitly name each of your test sources in your
+ * JobTest.
+ */
+ val buffer =
+ if (readOrWrite == Write) {
+ val buf = buffers(this)
+ //Make sure we wipe it out:
+ buf.clear()
+ buf
+ } else {
+ // if the source is also used as a sink, we don't want its contents to get modified
+ buffers(this).clone()
}
-
- hbt.setInputSplitType(inputSplitType)
-
+ // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
+ new MemoryTap[InputStream, OutputStream](localScheme, buffer)
+ }*/
+ case testMode @ Test(buffer) => readOrWrite match {
+
+ case Read => {
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
hbt.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
-
- hbt.setUseSaltInSink(useSalt)
-
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
hbt.asInstanceOf[Tap[_,_,_]]
}
}
@@ -139,5 +143,43 @@ case class HBaseSource(
throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
}
}
- }
+ }
+
+ /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
+ mode match {
+ case Test(buffers) => {
+ /*
+ * There MUST have already been a registered sink or source in the Test mode.
+ * to access this. You must explicitly name each of your test sources in your
+ * JobTest.
+ */
+ val buffer =
+ if (readOrWrite == Write) {
+ val buf = buffers(this)
+ //Make sure we wipe it out:
+ buf.clear()
+ buf
+ } else {
+ // if the source is also used as a sink, we don't want its contents to get modified
+ buffers(this).clone()
+ }
+ // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
+ new MemoryTap[InputStream, OutputStream](localScheme, buffer)
+ }
+ case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match {
+ case Read => {
+ val buffer = buffers(this)
+ val fields = hdfsScheme.getSourceFields
+ (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]]
+ }
+ case Write => {
+ val path = hdfsTest.getWritePathFor(this)
+ castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE))
+ }
+ }
+ case _ => {
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
+ }
+ } */
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
index 2921df8..6e56c52 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
@@ -20,8 +20,8 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC
val output = args("output")
- val properties = new Properties()
- AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] );
+// val properties = new Properties()
+// AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] );
val hbs = new HBaseSource(
"table_name",
@@ -31,6 +31,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC
List(new Fields("column_name1", "column_name2")),
sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3"))
.read
+ .debug
.fromBytesWritable(new Fields("key", "column_name1", "column_name2"))
.write(Tsv(output format "get_list"))