From 6e21e0c68248a33875898b86a2be7a9cec7df3d4 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Thu, 6 Jun 2013 12:27:15 +0100 Subject: Added extensions to Read and Write mode. Added support for key prefixes --- .../spyglass/hbase/HBaseConversions.scala | 54 +++ .../parallelai/spyglass/hbase/HBaseSource.scala | 23 +- .../spyglass/hbase/example/HBaseExample.scala | 2 +- .../spyglass/hbase/testing/HBaseSaltTester.scala | 101 +++++ .../hbase/testing/HBaseSaltTesterRunner.scala | 18 + .../hbase/testing/HBaseSourceShouldRead.scala | 444 +++++++++++++++++++++ .../testing/HBaseSourceShouldReadRunner.scala | 10 + 7 files changed, 644 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala create mode 100644 src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala create mode 100644 src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala create mode 100644 src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala create mode 100644 src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala (limited to 'src/main/scala/parallelai/spyglass/hbase') diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala new file mode 100644 index 0000000..21d90e8 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -0,0 +1,54 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import com.twitter.scalding.Dsl._ +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding.RichPipe +import com.twitter.scalding.RichFields +import org.apache.hadoop.hbase.util.Bytes +import cascading.tuple.TupleEntry + +class HBasePipeWrapper (pipe: Pipe) { + def toBytesWritable(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => { + new ImmutableBytesWritable(Bytes.toBytes(from)) + }} + }} + } + +// def toBytesWritable : Pipe = { +// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { +// p.map(f.toString -> f.toString){ from: String => { +// new ImmutableBytesWritable(Bytes.toBytes(from)) +// }} +// }} +// } + + def fromBytesWritable(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe) { (p, fld) => + p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { + Bytes.toString(from.get) + } + } + } + } + +// def fromBytesWritable : Pipe = { +// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => +// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { +// Bytes.toString(from.get) +// } +// } +// } +// } +} + +trait HBasePipeConversions { + implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe) +} + + diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index e46ef50..39a076e 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -9,6 +9,10 @@ import com.twitter.scalding.Mode import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write + +import parallelai.spyglass.hbase.HBaseScheme; +import parallelai.spyglass.hbase.HBaseTap; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.scheme.Scheme import cascading.tap.SinkMode import cascading.tap.Tap @@ -17,7 +21,6 @@ import org.apache.hadoop.mapred.RecordReader import scala.compat.Platform import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf -import parallelai.spyglass.hbase.HBaseConstants.SourceMode object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -36,7 +39,10 @@ class HBaseSource( sourceMode: SourceMode = SourceMode.SCAN_ALL, startKey: String = null, stopKey: String = null, - keyList: List[String] = null + keyList: List[String] = null, + versions: Int = 1, + useSalt: Boolean = false, + prefixList: String = null ) extends Source { override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) @@ -51,19 +57,20 @@ class HBaseSource( case hdfsMode @ Hdfs(_, _) => readOrWrite match { case Read => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) - + sourceMode match { case SourceMode.SCAN_RANGE => { - hbt.setHBaseRangeParms(startKey, stopKey) + + hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList) } case SourceMode.SCAN_ALL => { - hbt.setHBaseScanAllParms() + hbt.setHBaseScanAllParms(useSalt, prefixList) } case SourceMode.GET_LIST => { - if( keyList == null ) + if( keyList == null ) throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) - hbt.setHBaseListParms(keyList.toArray[String]) + hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList) } case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) } @@ -73,6 +80,8 @@ class HBaseSource( case Write => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) + hbt.setUseSaltInSink(useSalt); + hbt.asInstanceOf[Tap[_,_,_]] } } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index 4c86b07..1ce9072 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -27,7 +27,7 @@ class HBaseExample(args: Args) extends JobBase(args) { val jobConf = getJobConf - val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" + val quorumNames = args("quorum") case class HBaseTableStore( conf: Configuration, diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala new file mode 100644 index 0000000..d24f785 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -0,0 +1,101 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +import com.twitter.scalding.Args +import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.Tsv +import cascading.tuple.Fields +import com.twitter.scalding.TextLine +import org.apache.log4j.Logger +import org.apache.log4j.Level +import parallelai.spyglass.hbase.HBasePipeConversions +import cascading.pipe.Pipe + +class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + + val isDebug = args.getOrElse("debug", "false").toBoolean + + if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) } + + val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + + val prefix = "0123456789" + +// val hbase01 = CommonFunctors.fromBytesWritable( +// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.SCAN_ALL ).read, +// TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanAllNoSalt01")) + +// val hbase02 = CommonFunctors.fromBytesWritable( +// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read, +// TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanAllPlusSalt01")) + +// val hbase03 = CommonFunctors.fromBytesWritable( +// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read, +// TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanRangeNoSalt01")) + +// val hbase04 = CommonFunctors.fromBytesWritable( +// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read, +// TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanRangePlusSalt01")) + +// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read +// +// val hbase05 = CommonFunctors.fromBytesWritable( +// hbase05bytes, +// TABLE_SCHEMA ) +// .write(TextLine("saltTesting/GetListNoSalt01")) +// +// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read +// +// val hbase06 = CommonFunctors.fromBytesWritable( +// hbase06bytes, +// TABLE_SCHEMA ) +// .write(TextLine("saltTesting/GetListPlusSalt01")) + + val hbase07 = + new HBaseSource( "_TEST.SALT.03", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) + .read + .fromBytesWritable( TABLE_SCHEMA ) + .write(TextLine("saltTesting/ScanRangePlusSalt10")) + .toBytesWritable( TABLE_SCHEMA ) + .write(new HBaseSource( "_TEST.SALT.04", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + useSalt = true )) + +// val hbase08 = +// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +// sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) +// .read +// .fromBytesWritable('*) +// .write(TextLine("saltTesting/ScanRangePlusSalt03")) + +} \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala new file mode 100644 index 0000000..af7d7d2 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -0,0 +1,18 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobRunner + +object HBaseSaltTesterRunner extends App { + +// if( args.length < 2 ) { throw new Exception("Not enough Args")} + + val appConfig = "/home/crajah/tmp/application.conf" + val libPath = "/home/crajah/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + + JobRunner.main(Array(classOf[HBaseSaltTester].getName, + "--hdfs", + "--app.conf.path", appConfig, + "--job.lib.path", libPath, + "--debug", "true" + )) +} \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala new file mode 100644 index 0000000..69f8b60 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala @@ -0,0 +1,444 @@ +package parallelai.spyglass.hbase.testing + +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.log4j.Logger +import com.twitter.scalding.Args +import com.twitter.scalding.IterableSource +import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.base.JobBase +import cascading.pipe.Pipe +import parallelai.spyglass.hbase.HBasePipeConversions + +/** + * This integration-test expects some HBase table to exist + * with specific data - see GenerateTestingHTables.java + * + * Keep in mind that currently: + * + No version support exists in Scans + * + GET_LIST is working as a Set - Having a rowkey twice in the GET_LIST - will return in only one GET + * + * ISSUES: + * + If Scan_Range is unordered i.e. 9 -> 1 (instead of 1 -> 9) unhandled exception is thrown: + * Caused by: java.lang.IllegalArgumentException: Invalid range: 9 > 11000000 + * at org.apache.hadoop.hbase.client.HTable.getRegionsInRange(HTable.java:551) + * + * @author Antwnis@gmail.com + */ + +// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala +class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeConversions { + + // Initiate logger + private val LOG: Logger = LogManager.getLogger(this.getClass) + + // Set to Level.DEBUG if --debug is passed in + val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean + if (isDebug) { + LOG.setLevel(Level.DEBUG) + LOG.info("Setting logging to Level.DEBUG") + } + + // Set HBase host + val hbaseHost = "cldmgr.prod.bigdata.bskyb.com:2181" + + // ----------------------------- + // ----- Tests for TABLE_01 ---- + // ----------------------------- + val TABLE_01_SCHEMA = List('key,'column1) + val tableName1 = "TABLE_01" + val tableName2 = "TABLE_02" + + // -------------------- Test 01 -------------------- + var testName01 = "Scan_Test_01_Huge_Key_Range" + println("---- Running : " + testName01) + // Get everything from HBase testing table into a Pipe + val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // Calculate expected result for Test 01 + var list01 = List(("2000-01-01 10:00:10", "1"), + ("2000-01-01 10:05:00", "2"), + ("2000-01-01 10:10:00", "3")) + + // -------------------- Test 02 -------------------- + val testName02 = "Scan_Test_02_Borders_Range" + println("---- Running : " + testName02) + // Get everything from HBase testing table into a Pipe + val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") + .read + .fromBytesWritable(TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + // Calculate expected result for Test 02 + var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3")) + + // -------------------- Test 03 -------------------- + val testName03 = "Scan_Test_03_Inner_Range" + // Get everything from HBase testing table into a Pipe + val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // Calculate expected result for Test 03 + var list03 = List(("2000-01-01 10:05:00", "2")) + + // -------------------- Test 04 -------------------- + val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered" + // Get everything from HBase testing table into a Pipe + val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // -------------------- Test 0 - TODO scan multiple versions .. -------------------- +// val testName04 = "Scan_Test_04_One_Version" +// // Get everything from HBase testing table into a Pipe +// val hbase04 = CommonFunctors.fromBytesWritable( +// new HBaseSource( tableName2, hbaseHost, 'key, +// Array("data"), +// Array('column1), +// sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00", +// versions = 1 ) // If versions is '0' - it is regarded as '1' +// .read +// , TABLE_01_SCHEMA) +// .groupAll { group => +// group.toList[String]('key -> 'key) +// group.toList[String]('column1 -> 'column1) +// } +// .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +// x._1 + " " + x._2 +// } +// +// // Calculate expected result for Test 04 +// var list04 = List(("","")) + + + // -------------------- Test 05 -------------------- + val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions" + // Get everything from HBase testing table into a Pipe + val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), + versions = 1 ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // Calculate expected result for Test 04 + var list05 = List(("2000-01-01 11:00:00", "6")) + + // -------------------- Test 6 -------------------- + val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions" + val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), + versions = 2 ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // Calculate expected result for Test 05 + var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2")) + + // -------------------- Test 7 -------------------- + val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions" + // Get everything from HBase testing table into a Pipe + val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), + versions = 3 ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // Calculate expected result for Test 07 + var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + + // -------------------- Test 08 -------------------- + val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions" + // Get everything from HBase testing table into a Pipe + val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), + versions = 4 ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"), + ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + + // -------------------- Test 09 -------------------- + val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions" + // Get everything from HBase testing table into a Pipe + val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), + versions = 4 ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + + + // -------------------- Test 10 -------------------- + val testName10 = "Get_Test_06_TestWith10000and1rowkeys" + var bigList1:List[String] = (1 to 10000).toList.map(_.toString) + var bigList2:List[String] = (100001 to 200000).toList.map(_.toString) + var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00") + + // Get everything from HBase testing table into a Pipe + val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = bigList, + versions = 2 ) // + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + + var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), + ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") + ) + + // -------------------- Test 11 -------------------- + val testName11 = "Get_Test_07_EmptyList" + // Get everything from HBase testing table into a Pipe + val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List(), + versions = 1 ) // + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + + // -------------------- Test 11 -------------------- + val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions" + // Get everything from HBase testing table into a Pipe + val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), + versions = 1 ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + // --------------------- TEST 13 ----------------------------- + val testName13 = "Some " + val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key, + Array("data"), + Array('column1), + sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true ) + .read + .fromBytesWritable( + TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => + x._1 + " " + x._2 + } + + var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), + ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") + ) + + + // Store results of Scan Test 01 + ( + getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ + getTestResultPipe(getExpectedPipe(list02), hbase02, testName02) ++ + getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++ + assertPipeIsEmpty(hbase04, testName04) ++ + getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++ + getTestResultPipe(getExpectedPipe(list06), hbase06, testName06) ++ + getTestResultPipe(getExpectedPipe(list07), hbase07, testName07) ++ + getTestResultPipe(getExpectedPipe(list08), hbase08, testName08) ++ + getTestResultPipe(getExpectedPipe(list09), hbase09, testName09) ++ + getTestResultPipe(getExpectedPipe(list10), hbase10, testName10) ++ + getTestResultPipe(getExpectedPipe(list13), hbase13, testName13) ++ + assertPipeIsEmpty(hbase11, testName11) ++ + assertPipeIsEmpty(hbase12, testName12) + ).groupAll { group => + group.sortBy('testName) + } + .write(Tsv("HBaseShouldRead")) + + + /** + * We assume the pipe is empty + * + * We concatenate with a header - if the resulting size is 1 + * then the original size was 0 - then the pipe was empty :) + * + * The result is then returned in a Pipe + */ + def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = { + val headerPipe = IterableSource(List(testName), 'hbasedata) + val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => + group.size('size) + } + .project('size) + + val result = + concatenation + .mapTo('size -> ('testName, 'result, 'expecteddata, 'hbasedata)) { x:String => { + if (x == "1") { + (testName, "Success", "", "") + } else { + (testName, "Test Failed", "", "") + } + } + } + + result + } + + /** + * Methods receives 2 pipes - and projects the results of testing + * + * expectedPipe should have a column 'expecteddata + * realHBasePipe should have a column 'hbasedata + */ + def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { + val results = expectedPipe.insert('testName , testName) + .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) + .map(('expecteddata, 'hbasedata)->'result) { x:(String,String) => + if (x._1.equals(x._2)) + "Success" + else + "Test Failed" + } + .project('testName, 'result, 'expecteddata, 'hbasedata) + results + } + + /** + * + */ + def getExpectedPipe ( expectedList: List[(String,String)]) : Pipe = { + + val expectedPipe = + IterableSource(expectedList, TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + } + .mapTo(('*) -> 'expecteddata) { x:(String,String) => + x._1 + " " + x._2 + } + expectedPipe + } + +} diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala new file mode 100644 index 0000000..aa77caa --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala @@ -0,0 +1,10 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobRunner + +object HBaseSourceShouldReadRunner extends App { + val appConfig = "/projects/applications.conf" + val libPath = "/media/sf__CHANDAN_RAJAH_/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + + JobRunner.main(Array(classOf[HBaseSourceShouldRead].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath)) +} \ No newline at end of file -- cgit v1.2.3