diff options
| author | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 | 
|---|---|---|
| committer | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 | 
| commit | 6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch) | |
| tree | 5254682e3c3440f7c6954b23519459107b8a445e /src/main/scala/parallelai/spyglass | |
| parent | ea9c80374da846edf2a1634a42ccb932838ebd5b (diff) | |
| download | SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip | |
Added extensions to Read and Write mode.
Added support for key prefixes
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
7 files changed, 644 insertions, 8 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala new file mode 100644 index 0000000..21d90e8 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -0,0 +1,54 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import com.twitter.scalding.Dsl._ +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding.RichPipe +import com.twitter.scalding.RichFields +import org.apache.hadoop.hbase.util.Bytes +import cascading.tuple.TupleEntry + +class HBasePipeWrapper (pipe: Pipe) { +   def toBytesWritable(f: Fields): Pipe = { +	  asList(f) +     .foldLeft(pipe){ (p, f) => { +	    p.map(f.toString -> f.toString){ from: String => { +	      new ImmutableBytesWritable(Bytes.toBytes(from)) +	    }} +	  }}  +	} + +//   def toBytesWritable : Pipe = { +//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { +//	    p.map(f.toString -> f.toString){ from: String => { +//	      new ImmutableBytesWritable(Bytes.toBytes(from)) +//	    }} +//	  }}  +//	} + +	def fromBytesWritable(f: Fields): Pipe = { +	  asList(f) +	  .foldLeft(pipe) { (p, fld) => +	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { +	    	Bytes.toString(from.get) +	      } +	    } +	  } +	}	 +	 +//	def fromBytesWritable : Pipe = { +//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => +//	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { +//	    	Bytes.toString(from.get) +//	      } +//	    } +//	  } +//	} +} + +trait HBasePipeConversions { +  implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe)  +} + + diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index e46ef50..39a076e 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -9,6 +9,10 @@ import com.twitter.scalding.Mode  import com.twitter.scalding.Read  import com.twitter.scalding.Source  import com.twitter.scalding.Write + +import parallelai.spyglass.hbase.HBaseScheme; +import parallelai.spyglass.hbase.HBaseTap; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  import cascading.scheme.Scheme  import cascading.tap.SinkMode  import cascading.tap.Tap @@ -17,7 +21,6 @@ import org.apache.hadoop.mapred.RecordReader  import scala.compat.Platform  import org.apache.hadoop.mapred.OutputCollector  import org.apache.hadoop.mapred.JobConf -import parallelai.spyglass.hbase.HBaseConstants.SourceMode  object Conversions {    implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -36,7 +39,10 @@ class HBaseSource(      sourceMode: SourceMode = SourceMode.SCAN_ALL,      startKey: String = null,      stopKey: String = null, -    keyList: List[String] = null +    keyList: List[String] = null, +    versions: Int = 1, +    useSalt: Boolean = false, +    prefixList: String = null    ) extends Source {    override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) @@ -51,19 +57,20 @@ class HBaseSource(        case hdfsMode @ Hdfs(_, _) => readOrWrite match {          case Read => {             val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) -           +                       sourceMode match {              case SourceMode.SCAN_RANGE => { -              hbt.setHBaseRangeParms(startKey, stopKey) +               +              hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)              }              case SourceMode.SCAN_ALL => { -              hbt.setHBaseScanAllParms() +              hbt.setHBaseScanAllParms(useSalt, prefixList)              }              case SourceMode.GET_LIST => { -              if( keyList == null )  +              if( keyList == null )                    throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) -              hbt.setHBaseListParms(keyList.toArray[String]) +              hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)              }              case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))            } @@ -73,6 +80,8 @@ class HBaseSource(          case Write => {            val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) +          hbt.setUseSaltInSink(useSalt); +                      hbt.asInstanceOf[Tap[_,_,_]]          }        } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index 4c86b07..1ce9072 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -27,7 +27,7 @@ class HBaseExample(args: Args) extends JobBase(args) {    val jobConf = getJobConf -  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" +  val quorumNames = args("quorum")    case class HBaseTableStore(        conf: Configuration, diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala new file mode 100644 index 0000000..d24f785 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -0,0 +1,101 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +import com.twitter.scalding.Args +import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.Tsv +import cascading.tuple.Fields +import com.twitter.scalding.TextLine +import org.apache.log4j.Logger +import org.apache.log4j.Level +import parallelai.spyglass.hbase.HBasePipeConversions +import cascading.pipe.Pipe + +class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + +  val isDebug = args.getOrElse("debug", "false").toBoolean +   +  if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) }  +   +  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) +   +  val prefix = "0123456789" +   +//  val hbase01 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_ALL ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanAllNoSalt01")) + +//  val hbase02 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanAllPlusSalt01")) + +//  val hbase03 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangeNoSalt01")) + +//  val hbase04 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangePlusSalt01")) + +//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read +//           +//  val hbase05 = CommonFunctors.fromBytesWritable( +//      hbase05bytes,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListNoSalt01")) +// +//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read +//           +//  val hbase06 = CommonFunctors.fromBytesWritable( +//      hbase06bytes,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListPlusSalt01")) + +  val hbase07 =  +      new HBaseSource( "_TEST.SALT.03", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) +  .read +  .fromBytesWritable( TABLE_SCHEMA ) +  .write(TextLine("saltTesting/ScanRangePlusSalt10")) +  .toBytesWritable( TABLE_SCHEMA ) +  .write(new HBaseSource( "_TEST.SALT.04", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          useSalt = true )) + +//  val hbase08 =  +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) +//  .read +//  .fromBytesWritable('*) +//  .write(TextLine("saltTesting/ScanRangePlusSalt03")) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala new file mode 100644 index 0000000..af7d7d2 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -0,0 +1,18 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobRunner + +object HBaseSaltTesterRunner extends App { +   +//  if( args.length < 2 ) { throw new Exception("Not enough Args")} +   +  val appConfig = "/home/crajah/tmp/application.conf" +  val libPath = "/home/crajah/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + +  JobRunner.main(Array(classOf[HBaseSaltTester].getName,  +      "--hdfs",  +      "--app.conf.path", appConfig,  +      "--job.lib.path", libPath, +      "--debug", "true" +  )) +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala new file mode 100644 index 0000000..69f8b60 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala @@ -0,0 +1,444 @@ +package parallelai.spyglass.hbase.testing + +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.log4j.Logger +import com.twitter.scalding.Args +import com.twitter.scalding.IterableSource +import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.base.JobBase +import cascading.pipe.Pipe +import parallelai.spyglass.hbase.HBasePipeConversions + +/** + * This integration-test expects some HBase table to exist + * with specific data - see GenerateTestingHTables.java + * + * Keep in mind that currently:   + * + No version support exists in Scans + * + GET_LIST is working as a Set - Having a rowkey twice in the GET_LIST - will return in only one GET + *  + * ISSUES: + *  + If Scan_Range is unordered i.e. 9 -> 1 (instead of 1 -> 9) unhandled exception is thrown: + *  Caused by: java.lang.IllegalArgumentException: Invalid range: 9 > 11000000 + *	at org.apache.hadoop.hbase.client.HTable.getRegionsInRange(HTable.java:551) + *   + * @author Antwnis@gmail.com + */ + +// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala +class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeConversions { +   +  // Initiate logger +  private val LOG: Logger = LogManager.getLogger(this.getClass) +   +  // Set to Level.DEBUG if --debug is passed in +  val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean +  if (isDebug) {  +    LOG.setLevel(Level.DEBUG) +    LOG.info("Setting logging to Level.DEBUG") +  } +   +  // Set HBase host +  val hbaseHost = "cldmgr.prod.bigdata.bskyb.com:2181"  + +  // ----------------------------- +  // ----- Tests for TABLE_01 ---- +  // ----------------------------- +  val TABLE_01_SCHEMA = List('key,'column1) +  val tableName1 = "TABLE_01" +  val tableName2 = "TABLE_02" + +  // -------------------- Test 01 -------------------- +  var testName01 = "Scan_Test_01_Huge_Key_Range" +  println("---- Running : " + testName01) +  // Get everything from HBase testing table into a Pipe +  val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") +  		  	.read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 01 +  var list01 = List(("2000-01-01 10:00:10", "1"), +                    ("2000-01-01 10:05:00", "2"), +                    ("2000-01-01 10:10:00", "3")) + +  // -------------------- Test 02 -------------------- +  val testName02 = "Scan_Test_02_Borders_Range" +  println("---- Running : " + testName02) +  // Get everything from HBase testing table into a Pipe +  val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") +  		  	.read +  		  	.fromBytesWritable(TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +  // Calculate expected result for Test 02 +  var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3")) + +  // -------------------- Test 03 -------------------- +  val testName03 = "Scan_Test_03_Inner_Range" +  // Get everything from HBase testing table into a Pipe +  val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") +  		  	.read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +   +  // Calculate expected result for Test 03 +  var list03 = List(("2000-01-01 10:05:00", "2")) + +  // -------------------- Test 04 -------------------- +  val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered" +  // Get everything from HBase testing table into a Pipe +  val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") +  		  	.read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +   +  // -------------------- Test 0 - TODO scan multiple versions .. -------------------- +//  val testName04 = "Scan_Test_04_One_Version" +//  // Get everything from HBase testing table into a Pipe +//  val hbase04 = CommonFunctors.fromBytesWritable( +//  		new HBaseSource( tableName2, hbaseHost, 'key,   +//  		    Array("data"),  +//  		    Array('column1), +//            sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00", +//            versions = 1 ) // If versions is '0' - it is regarded as '1' +//            .read  	     +//  	    , TABLE_01_SCHEMA) +//  .groupAll { group =>  +//  	   group.toList[String]('key -> 'key) +//  	   group.toList[String]('column1 -> 'column1) +//  } +//  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +//	     x._1 + " " + x._2 +//  } +//   +//  // Calculate expected result for Test 04 +//  var list04 = List(("","")) + + +  // -------------------- Test 05 -------------------- +  val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), +            versions = 1 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 04 +  var list05 = List(("2000-01-01 11:00:00", "6")) + +  // -------------------- Test 6 -------------------- +  val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions" +  val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), +            versions = 2 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group => +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 05 +  var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2")) + +  // -------------------- Test 7 -------------------- +  val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), +            versions = 3 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 07 +  var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) +   +  // -------------------- Test 08 -------------------- +  val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), +            versions = 4 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"), +                    ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + +  // -------------------- Test 09 -------------------- +  val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions" +  // Get everything from HBase testing table into a Pipe +  val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), +            versions = 4 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + + +  // -------------------- Test 10 -------------------- +  val testName10 = "Get_Test_06_TestWith10000and1rowkeys" +  var bigList1:List[String] = (1 to 10000).toList.map(_.toString)  +  var bigList2:List[String] = (100001 to 200000).toList.map(_.toString) +  var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00")    + +  // Get everything from HBase testing table into a Pipe +  val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.GET_LIST, keyList = bigList, +            versions = 2 ) //  +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +   +  var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), +		  			("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") +                    ) +   +  // -------------------- Test 11 -------------------- +  val testName11 = "Get_Test_07_EmptyList" +  // Get everything from HBase testing table into a Pipe +  val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.GET_LIST, keyList = List(), +            versions = 1 ) //  +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +   +  // -------------------- Test 11 -------------------- +  val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), +            versions = 1 )   +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +   +  // --------------------- TEST 13 ----------------------------- +  val testName13 = "Some " +  val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key,   +          Array("data"),  +          Array('column1), +          sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true )   +            .read        +        .fromBytesWritable( +      TABLE_01_SCHEMA) +  .groupAll { group =>  +       group.toList[String]('key -> 'key) +       group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +       x._1 + " " + x._2 +  }  + +  var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), +            ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") +                    ) + +   +  // Store results of Scan Test 01 +  ( +    getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ +    getTestResultPipe(getExpectedPipe(list02), hbase02, testName02) ++ +    getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++   +    assertPipeIsEmpty(hbase04, testName04) ++ +    getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++  +    getTestResultPipe(getExpectedPipe(list06), hbase06, testName06) ++  +    getTestResultPipe(getExpectedPipe(list07), hbase07, testName07) ++ +    getTestResultPipe(getExpectedPipe(list08), hbase08, testName08) ++ +    getTestResultPipe(getExpectedPipe(list09), hbase09, testName09) ++ +    getTestResultPipe(getExpectedPipe(list10), hbase10, testName10) ++ +    getTestResultPipe(getExpectedPipe(list13), hbase13, testName13) ++ +    assertPipeIsEmpty(hbase11, testName11) ++ +    assertPipeIsEmpty(hbase12, testName12)  +  ).groupAll { group => +    group.sortBy('testName) +  } +  .write(Tsv("HBaseShouldRead")) +   +   +  /** +   * We assume the pipe is empty +   *  +   * We concatenate with a header - if the resulting size is 1  +   * then the original size was 0 - then the pipe was empty :) +   *  +   * The result is then returned in a Pipe +   */ +  def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = {  +    val headerPipe = IterableSource(List(testName), 'hbasedata) +    val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => +      group.size('size) +    } +    .project('size) +     +    val result =  +      concatenation +      .mapTo('size -> ('testName, 'result, 'expecteddata, 'hbasedata)) { x:String => {  +	      if (x == "1") {  +	        (testName, "Success", "", "") +	      } else { +	        (testName, "Test Failed", "", "") +	      }   +    	} +      } +     +    result +  } +   +  /** +   * Methods receives 2 pipes - and projects the results of testing +   *  +   * expectedPipe  should have a column 'expecteddata +   * realHBasePipe should have a column 'hbasedata +   */ +  def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { +	val results = expectedPipe.insert('testName , testName) +                 .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) +    .map(('expecteddata, 'hbasedata)->'result) { x:(String,String) => +        if (x._1.equals(x._2))  +           "Success" +        else +           "Test Failed" +    } +    .project('testName, 'result, 'expecteddata, 'hbasedata) +    results +  } + +  /**  +   *   +   */ +  def getExpectedPipe ( expectedList: List[(String,String)]) : Pipe = { +     +    val expectedPipe =  +      IterableSource(expectedList, TABLE_01_SCHEMA) +      .groupAll { group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +      } +      .mapTo(('*) -> 'expecteddata) { x:(String,String) => +         x._1 + " " + x._2 +      } +   expectedPipe +  } +   +} diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala new file mode 100644 index 0000000..aa77caa --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala @@ -0,0 +1,10 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobRunner + +object HBaseSourceShouldReadRunner extends App { +  val appConfig = "/projects/applications.conf" +  val libPath = "/media/sf__CHANDAN_RAJAH_/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + +  JobRunner.main(Array(classOf[HBaseSourceShouldRead].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath)) +}
\ No newline at end of file | 
