From 20a18b4388f0cd06bec0b43d083150f6e1bb2c5e Mon Sep 17 00:00:00 2001 From: Gracia Fernandez Date: Thu, 4 Jul 2013 16:49:09 +0100 Subject: Changed HBaseSource and JDBCSource to allow testing with JobTest. Samples of tests included. --- .../spyglass/hbase/HBaseConversions.scala | 4 +- .../parallelai/spyglass/hbase/HBaseSource.scala | 23 +- .../spyglass/hbase/example/HBaseExample.scala | 28 +- .../hbase/example/SimpleHBaseSourceExample.scala | 32 ++ .../spyglass/hbase/testing/HBaseSaltTester.scala | 38 +- .../hbase/testing/HBaseSourceShouldRead.scala | 406 +++++++++++---------- 6 files changed, 294 insertions(+), 237 deletions(-) create mode 100644 src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala (limited to 'src/main/scala/parallelai/spyglass/hbase') diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index 6035688..b6d5742 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -36,8 +36,8 @@ class HBasePipeWrapper (pipe: Pipe) { } } } - } - + } + // def fromBytesWritable : Pipe = { // asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => // p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 39a076e..d6795aa 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -10,15 +10,12 @@ import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseScheme; -import parallelai.spyglass.hbase.HBaseTap; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -import cascading.scheme.Scheme +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import cascading.scheme.{NullScheme, Scheme} import cascading.tap.SinkMode import cascading.tap.Tap import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader -import scala.compat.Platform import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf @@ -29,13 +26,13 @@ object Conversions { implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s)) } -class HBaseSource( +case class HBaseSource( tableName: String = null, quorumNames: String = "localhost", keyFields: Fields = null, - familyNames: Array[String] = null, - valueFields: Array[Fields] = null, - timestamp: Long = Platform.currentTime, + familyNames: List[String] = null, + valueFields: List[Fields] = null, + timestamp: Long = 0L, sourceMode: SourceMode = SourceMode.SCAN_ALL, startKey: String = null, stopKey: String = null, @@ -45,9 +42,13 @@ class HBaseSource( prefixList: String = null ) extends Source { - override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) + override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + // To enable local mode testing + val allFields = keyFields.append(valueFields.toArray) + override def localScheme = new NullScheme(allFields, allFields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val hBaseScheme = hdfsScheme match { case hbase: HBaseScheme => hbase @@ -80,7 +81,7 @@ class HBaseSource( case Write => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) - hbt.setUseSaltInSink(useSalt); + hbt.setUseSaltInSink(useSalt) hbt.asInstanceOf[Tap[_,_,_]] } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index 1ce9072..eccd653 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -19,13 +19,13 @@ class HBaseExample(args: Args) extends JobBase(args) { val isDebug: Boolean = args("debug").toBoolean - if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) + if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) val output = args("output") println(output) - val jobConf = getJobConf + val jobConf = getJobConf() val quorumNames = args("quorum") @@ -38,20 +38,20 @@ class HBaseExample(args: Args) extends JobBase(args) { val connection = HConnectionManager.getConnection(conf) val maxThreads = conf.getInt("hbase.htable.threads.max", 1) - conf.set("hbase.zookeeper.quorum", quorumNames); + conf.set("hbase.zookeeper.quorum", quorumNames) val htable = new HTable(HBaseConfiguration.create(conf), tableName) } - val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") + val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet") val hbs2 = new HBaseSource( "table_name", "quorum_name:2181", 'key, - Array("column_family"), - Array('column_name), + List("column_family"), + List('column_name), sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) .read .write(Tsv(output.format("get_list"))) @@ -60,8 +60,8 @@ class HBaseExample(args: Args) extends JobBase(args) { "table_name", "quorum_name:2181", 'key, - Array("column_family"), - Array('column_name), + List("column_family"), + List('column_name), sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") .read .write(Tsv(output.format("scan_all"))) @@ -70,8 +70,8 @@ class HBaseExample(args: Args) extends JobBase(args) { "table_name", "quorum_name:2181", 'key, - Array("column_family"), - Array('column_name), + List("column_family"), + List('column_name), sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") .read .write(Tsv(output.format("scan_range_to_end"))) @@ -80,8 +80,8 @@ class HBaseExample(args: Args) extends JobBase(args) { "table_name", "quorum_name:2181", 'key, - Array("column_family"), - Array('column_name), + List("column_family"), + List('column_name), sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") .read .write(Tsv(output.format("scan_range_from_start"))) @@ -90,8 +90,8 @@ class HBaseExample(args: Args) extends JobBase(args) { "table_name", "quorum_name:2181", 'key, - Array("column_family"), - Array('column_name), + List("column_family"), + List('column_name), sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") .read .write(Tsv(output.format("scan_range_between"))) diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala new file mode 100644 index 0000000..7ba2788 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -0,0 +1,32 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.{Tsv, Args} +import parallelai.spyglass.base.JobBase +import org.apache.log4j.{Level, Logger} +import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import cascading.tuple.Fields + +/** + * Simple example of HBaseSource usage + */ +class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeConversions { + + val isDebug: Boolean = args("debug").toBoolean + + if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + + val output = args("output") + + val hbs = new HBaseSource( + "table_name", + "quorum_name:2181", + new Fields("key"), + List("column_family"), + List(new Fields("column_name1", "column_name2")), + sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")) + .read + .fromBytesWritable(new Fields("key", "column_name1", "column_name2")) + .write(Tsv(output format "get_list")) + + } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index 2ca3f32..f774648 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -1,23 +1,21 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode import com.twitter.scalding.Args import parallelai.spyglass.hbase.HBaseSource -import com.twitter.scalding.Tsv import cascading.tuple.Fields import com.twitter.scalding.TextLine import org.apache.log4j.Logger import org.apache.log4j.Level import parallelai.spyglass.hbase.HBasePipeConversions -import cascading.pipe.Pipe class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { val isDebug = args.getOrElse("debug", "false").toBoolean - if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) } + if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } val TABLE_SCHEMA = List('key, 'salted, 'unsalted) @@ -26,44 +24,44 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val quorum = args("quorum") val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.SCAN_ALL ).read .fromBytesWritable( TABLE_SCHEMA ) .write(TextLine("saltTesting/ScanAllNoSalt01")) val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read .fromBytesWritable( TABLE_SCHEMA ) .write(TextLine("saltTesting/ScanAllPlusSalt01")) val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read .fromBytesWritable(TABLE_SCHEMA ) .write(TextLine("saltTesting/ScanRangeNoSalt01")) val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read .fromBytesWritable(TABLE_SCHEMA ) .write(TextLine("saltTesting/ScanRangePlusSalt01")) val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read .fromBytesWritable(TABLE_SCHEMA ) .write(TextLine("saltTesting/GetListNoSalt01")) val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read .fromBytesWritable(TABLE_SCHEMA ) @@ -71,16 +69,16 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val hbase07 = new HBaseSource( "_TEST.SALT.03", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) .read .fromBytesWritable( TABLE_SCHEMA ) .write(TextLine("saltTesting/ScanRangePlusSalt10")) .toBytesWritable( TABLE_SCHEMA ) .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), useSalt = true )) // val hbase08 = diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala index 536f843..10104bf 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala @@ -55,20 +55,22 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon var testName01 = "Scan_Test_01_Huge_Key_Range" println("---- Running : " + testName01) // Get everything from HBase testing table into a Pipe - val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase01 = new HBaseSource(tableName1, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // Calculate expected result for Test 01 var list01 = List(("2000-01-01 10:00:10", "1"), @@ -79,60 +81,66 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon val testName02 = "Scan_Test_02_Borders_Range" println("---- Running : " + testName02) // Get everything from HBase testing table into a Pipe - val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") - .read - .fromBytesWritable(TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase02 = new HBaseSource(tableName1, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") + .read + .fromBytesWritable(TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // Calculate expected result for Test 02 var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3")) // -------------------- Test 03 -------------------- val testName03 = "Scan_Test_03_Inner_Range" // Get everything from HBase testing table into a Pipe - val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } - + val hbase03 = new HBaseSource(tableName1, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } + // Calculate expected result for Test 03 var list03 = List(("2000-01-01 10:05:00", "2")) // -------------------- Test 04 -------------------- val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered" // Get everything from HBase testing table into a Pipe - val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase04 = new HBaseSource(tableName1, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // -------------------- Test 0 - TODO scan multiple versions .. -------------------- // val testName04 = "Scan_Test_04_One_Version" @@ -160,42 +168,46 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon // -------------------- Test 05 -------------------- val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions" // Get everything from HBase testing table into a Pipe - val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), - versions = 1 ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase05 = new HBaseSource(tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), + versions = 1) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // Calculate expected result for Test 04 var list05 = List(("2000-01-01 11:00:00", "6")) // -------------------- Test 6 -------------------- val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions" - val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), - versions = 2 ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), + versions = 2) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // Calculate expected result for Test 05 var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2")) @@ -203,21 +215,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon // -------------------- Test 7 -------------------- val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions" // Get everything from HBase testing table into a Pipe - val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), - versions = 3 ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), + versions = 3) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // Calculate expected result for Test 07 var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) @@ -225,21 +239,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon // -------------------- Test 08 -------------------- val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions" // Get everything from HBase testing table into a Pipe - val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), - versions = 4 ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase08 = new HBaseSource(tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), + versions = 4) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"), ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) @@ -247,21 +263,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon // -------------------- Test 09 -------------------- val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions" // Get everything from HBase testing table into a Pipe - val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), - versions = 4 ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), + versions = 4) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) @@ -273,21 +291,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00") // Get everything from HBase testing table into a Pipe - val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = bigList, - versions = 2 ) // - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = bigList, + versions = 2) // + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), @@ -297,58 +317,64 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon // -------------------- Test 11 -------------------- val testName11 = "Get_Test_07_EmptyList" // Get everything from HBase testing table into a Pipe - val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List(), - versions = 1 ) // - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List(), + versions = 1) // + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // -------------------- Test 11 -------------------- val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions" // Get everything from HBase testing table into a Pipe - val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), - versions = 1 ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase12 = new HBaseSource(tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), + versions = 1) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } // --------------------- TEST 13 ----------------------------- val testName13 = "Some " - val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key, - Array("data"), - Array('column1), - sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true ) - .read - .fromBytesWritable( - TABLE_01_SCHEMA) - .groupAll { group => - group.toList[String]('key -> 'key) - group.toList[String]('column1 -> 'column1) - } - .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => - x._1 + " " + x._2 - } + val hbase13 = new HBaseSource(tableName2, hbaseHost, 'key, + List("data"), + List('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey = "", useSalt = true) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { + group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { + x: (String, String) => + x._1 + " " + x._2 + } var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") -- cgit v1.2.3