diff options
Diffstat (limited to 'src/main/scala/parallelai')
3 files changed, 94 insertions, 33 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 09ad19d..dc87a4b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -20,6 +20,10 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import com.twitter.scalding.Source import com.twitter.scalding.TestMode import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -45,7 +49,7 @@ case class HBaseSource( sinkMode: SinkMode = SinkMode.UPDATE, inputSplitType: SplitType = SplitType.GRANULAR ) extends Source { - + val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) internalScheme.setInputSplitTye(inputSplitType) @@ -53,8 +57,11 @@ case class HBaseSource( // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) - def localScheme = new NullScheme(allFields, allFields) + //def localScheme = new NullScheme(allFields, allFields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val hBaseScheme = hdfsScheme match { case hbase: HBaseScheme => hbase @@ -94,36 +101,33 @@ case class HBaseSource( hbt.asInstanceOf[Tap[_,_,_]] } } - case testMode @ Test(_) => readOrWrite match { - case Read => { - val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) - - sourceMode match { - case SourceMode.SCAN_RANGE => { - - hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList) - } - case SourceMode.SCAN_ALL => { - hbt.setHBaseScanAllParms(useSalt, prefixList) - } - case SourceMode.GET_LIST => { - if( keyList == null ) - throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) - - hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList) - } - case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) + /**case Test(buffers) => { + /* + * There MUST have already been a registered sink or source in the Test mode. + * to access this. You must explicitly name each of your test sources in your + * JobTest. + */ + val buffer = + if (readOrWrite == Write) { + val buf = buffers(this) + //Make sure we wipe it out: + buf.clear() + buf + } else { + // if the source is also used as a sink, we don't want its contents to get modified + buffers(this).clone() } - - hbt.setInputSplitType(inputSplitType) - + // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields + new MemoryTap[InputStream, OutputStream](localScheme, buffer) + }*/ + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) hbt.asInstanceOf[Tap[_,_,_]] } case Write => { - val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) - - hbt.setUseSaltInSink(useSalt) - + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) hbt.asInstanceOf[Tap[_,_,_]] } } @@ -139,5 +143,43 @@ case class HBaseSource( throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) } } - } + } + + /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = { + mode match { + case Test(buffers) => { + /* + * There MUST have already been a registered sink or source in the Test mode. + * to access this. You must explicitly name each of your test sources in your + * JobTest. + */ + val buffer = + if (readOrWrite == Write) { + val buf = buffers(this) + //Make sure we wipe it out: + buf.clear() + buf + } else { + // if the source is also used as a sink, we don't want its contents to get modified + buffers(this).clone() + } + // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields + new MemoryTap[InputStream, OutputStream](localScheme, buffer) + } + case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match { + case Read => { + val buffer = buffers(this) + val fields = hdfsScheme.getSourceFields + (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]] + } + case Write => { + val path = hdfsTest.getWritePathFor(this) + castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE)) + } + } + case _ => { + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) + } + } + } */ } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala index 2921df8..6e56c52 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -20,8 +20,8 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC val output = args("output") - val properties = new Properties() - AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] ); +// val properties = new Properties() +// AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] ); val hbs = new HBaseSource( "table_name", @@ -31,6 +31,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC List(new Fields("column_name1", "column_name2")), sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")) .read + .debug .fromBytesWritable(new Fields("key", "column_name1", "column_name2")) .write(Tsv(output format "get_list")) diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 09f4579..2472eda 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -12,6 +12,11 @@ import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf +import com.twitter.scalding.Test +import com.twitter.scalding.MemoryTap +import java.io.InputStream +import java.io.OutputStream +import java.util.Properties case class JDBCSource( tableName: String = "tableName", @@ -32,8 +37,10 @@ case class JDBCSource( .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing - def localScheme = new NullScheme(fields, fields) - + //def localScheme = new NullScheme(fields, fields) + type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] + def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -53,6 +60,17 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } + case testMode @ Test(buffer) => readOrWrite match { + + case Read => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) + hbt.asInstanceOf[Tap[_,_,_]] + } + } case _ => createEmptyTap(readOrWrite)(mode) } } |