aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala
diff options
context:
space:
mode:
authorChandan Rajah <chandan.rajah@gmail.com>2013-06-06 12:27:15 +0100
committerChandan Rajah <chandan.rajah@gmail.com>2013-06-06 12:27:15 +0100
commit6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch)
tree5254682e3c3440f7c6954b23519459107b8a445e /src/main/scala
parentea9c80374da846edf2a1634a42ccb932838ebd5b (diff)
downloadSpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz
SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip
Added extensions to Read and Write mode.
Added support for key prefixes
Diffstat (limited to 'src/main/scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala54
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala23
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala2
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala101
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala18
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala444
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala10
7 files changed, 644 insertions, 8 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
new file mode 100644
index 0000000..21d90e8
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -0,0 +1,54 @@
+package parallelai.spyglass.hbase
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import com.twitter.scalding.Dsl._
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding.RichPipe
+import com.twitter.scalding.RichFields
+import org.apache.hadoop.hbase.util.Bytes
+import cascading.tuple.TupleEntry
+
+class HBasePipeWrapper (pipe: Pipe) {
+ def toBytesWritable(f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String => {
+ new ImmutableBytesWritable(Bytes.toBytes(from))
+ }}
+ }}
+ }
+
+// def toBytesWritable : Pipe = {
+// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
+// p.map(f.toString -> f.toString){ from: String => {
+// new ImmutableBytesWritable(Bytes.toBytes(from))
+// }}
+// }}
+// }
+
+ def fromBytesWritable(f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe) { (p, fld) =>
+ p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
+ Bytes.toString(from.get)
+ }
+ }
+ }
+ }
+
+// def fromBytesWritable : Pipe = {
+// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
+// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
+// Bytes.toString(from.get)
+// }
+// }
+// }
+// }
+}
+
+trait HBasePipeConversions {
+ implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe)
+}
+
+
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index e46ef50..39a076e 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -9,6 +9,10 @@ import com.twitter.scalding.Mode
import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
+
+import parallelai.spyglass.hbase.HBaseScheme;
+import parallelai.spyglass.hbase.HBaseTap;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.scheme.Scheme
import cascading.tap.SinkMode
import cascading.tap.Tap
@@ -17,7 +21,6 @@ import org.apache.hadoop.mapred.RecordReader
import scala.compat.Platform
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -36,7 +39,10 @@ class HBaseSource(
sourceMode: SourceMode = SourceMode.SCAN_ALL,
startKey: String = null,
stopKey: String = null,
- keyList: List[String] = null
+ keyList: List[String] = null,
+ versions: Int = 1,
+ useSalt: Boolean = false,
+ prefixList: String = null
) extends Source {
override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields)
@@ -51,19 +57,20 @@ class HBaseSource(
case hdfsMode @ Hdfs(_, _) => readOrWrite match {
case Read => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
-
+
sourceMode match {
case SourceMode.SCAN_RANGE => {
- hbt.setHBaseRangeParms(startKey, stopKey)
+
+ hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
}
case SourceMode.SCAN_ALL => {
- hbt.setHBaseScanAllParms()
+ hbt.setHBaseScanAllParms(useSalt, prefixList)
}
case SourceMode.GET_LIST => {
- if( keyList == null )
+ if( keyList == null )
throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
- hbt.setHBaseListParms(keyList.toArray[String])
+ hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
}
case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
}
@@ -73,6 +80,8 @@ class HBaseSource(
case Write => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
+ hbt.setUseSaltInSink(useSalt);
+
hbt.asInstanceOf[Tap[_,_,_]]
}
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
index 4c86b07..1ce9072 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
@@ -27,7 +27,7 @@ class HBaseExample(args: Args) extends JobBase(args) {
val jobConf = getJobConf
- val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181"
+ val quorumNames = args("quorum")
case class HBaseTableStore(
conf: Configuration,
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
new file mode 100644
index 0000000..d24f785
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
@@ -0,0 +1,101 @@
+package parallelai.spyglass.hbase.testing
+
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+import com.twitter.scalding.Args
+import parallelai.spyglass.hbase.HBaseSource
+import com.twitter.scalding.Tsv
+import cascading.tuple.Fields
+import com.twitter.scalding.TextLine
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+import parallelai.spyglass.hbase.HBasePipeConversions
+import cascading.pipe.Pipe
+
+class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val isDebug = args.getOrElse("debug", "false").toBoolean
+
+ if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) }
+
+ val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
+
+ val prefix = "0123456789"
+
+// val hbase01 = CommonFunctors.fromBytesWritable(
+// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.SCAN_ALL ).read,
+// TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanAllNoSalt01"))
+
+// val hbase02 = CommonFunctors.fromBytesWritable(
+// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read,
+// TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanAllPlusSalt01"))
+
+// val hbase03 = CommonFunctors.fromBytesWritable(
+// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read,
+// TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanRangeNoSalt01"))
+
+// val hbase04 = CommonFunctors.fromBytesWritable(
+// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read,
+// TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanRangePlusSalt01"))
+
+// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read
+//
+// val hbase05 = CommonFunctors.fromBytesWritable(
+// hbase05bytes,
+// TABLE_SCHEMA )
+// .write(TextLine("saltTesting/GetListNoSalt01"))
+//
+// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read
+//
+// val hbase06 = CommonFunctors.fromBytesWritable(
+// hbase06bytes,
+// TABLE_SCHEMA )
+// .write(TextLine("saltTesting/GetListPlusSalt01"))
+
+ val hbase07 =
+ new HBaseSource( "_TEST.SALT.03", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix )
+ .read
+ .fromBytesWritable( TABLE_SCHEMA )
+ .write(TextLine("saltTesting/ScanRangePlusSalt10"))
+ .toBytesWritable( TABLE_SCHEMA )
+ .write(new HBaseSource( "_TEST.SALT.04", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ useSalt = true ))
+
+// val hbase08 =
+// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+// sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix )
+// .read
+// .fromBytesWritable('*)
+// .write(TextLine("saltTesting/ScanRangePlusSalt03"))
+
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
new file mode 100644
index 0000000..af7d7d2
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
@@ -0,0 +1,18 @@
+package parallelai.spyglass.hbase.testing
+
+import parallelai.spyglass.base.JobRunner
+
+object HBaseSaltTesterRunner extends App {
+
+// if( args.length < 2 ) { throw new Exception("Not enough Args")}
+
+ val appConfig = "/home/crajah/tmp/application.conf"
+ val libPath = "/home/crajah/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation"
+
+ JobRunner.main(Array(classOf[HBaseSaltTester].getName,
+ "--hdfs",
+ "--app.conf.path", appConfig,
+ "--job.lib.path", libPath,
+ "--debug", "true"
+ ))
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala
new file mode 100644
index 0000000..69f8b60
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala
@@ -0,0 +1,444 @@
+package parallelai.spyglass.hbase.testing
+
+import org.apache.log4j.Level
+import org.apache.log4j.LogManager
+import org.apache.log4j.Logger
+import com.twitter.scalding.Args
+import com.twitter.scalding.IterableSource
+import com.twitter.scalding.Tsv
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
+import parallelai.spyglass.base.JobBase
+import cascading.pipe.Pipe
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+/**
+ * This integration-test expects some HBase table to exist
+ * with specific data - see GenerateTestingHTables.java
+ *
+ * Keep in mind that currently:
+ * + No version support exists in Scans
+ * + GET_LIST is working as a Set - Having a rowkey twice in the GET_LIST - will return in only one GET
+ *
+ * ISSUES:
+ * + If Scan_Range is unordered i.e. 9 -> 1 (instead of 1 -> 9) unhandled exception is thrown:
+ * Caused by: java.lang.IllegalArgumentException: Invalid range: 9 > 11000000
+ * at org.apache.hadoop.hbase.client.HTable.getRegionsInRange(HTable.java:551)
+ *
+ * @author Antwnis@gmail.com
+ */
+
+// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala
+class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ // Initiate logger
+ private val LOG: Logger = LogManager.getLogger(this.getClass)
+
+ // Set to Level.DEBUG if --debug is passed in
+ val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean
+ if (isDebug) {
+ LOG.setLevel(Level.DEBUG)
+ LOG.info("Setting logging to Level.DEBUG")
+ }
+
+ // Set HBase host
+ val hbaseHost = "cldmgr.prod.bigdata.bskyb.com:2181"
+
+ // -----------------------------
+ // ----- Tests for TABLE_01 ----
+ // -----------------------------
+ val TABLE_01_SCHEMA = List('key,'column1)
+ val tableName1 = "TABLE_01"
+ val tableName2 = "TABLE_02"
+
+ // -------------------- Test 01 --------------------
+ var testName01 = "Scan_Test_01_Huge_Key_Range"
+ println("---- Running : " + testName01)
+ // Get everything from HBase testing table into a Pipe
+ val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00")
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // Calculate expected result for Test 01
+ var list01 = List(("2000-01-01 10:00:10", "1"),
+ ("2000-01-01 10:05:00", "2"),
+ ("2000-01-01 10:10:00", "3"))
+
+ // -------------------- Test 02 --------------------
+ val testName02 = "Scan_Test_02_Borders_Range"
+ println("---- Running : " + testName02)
+ // Get everything from HBase testing table into a Pipe
+ val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00")
+ .read
+ .fromBytesWritable(TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+ // Calculate expected result for Test 02
+ var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3"))
+
+ // -------------------- Test 03 --------------------
+ val testName03 = "Scan_Test_03_Inner_Range"
+ // Get everything from HBase testing table into a Pipe
+ val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00")
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // Calculate expected result for Test 03
+ var list03 = List(("2000-01-01 10:05:00", "2"))
+
+ // -------------------- Test 04 --------------------
+ val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered"
+ // Get everything from HBase testing table into a Pipe
+ val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000")
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // -------------------- Test 0 - TODO scan multiple versions .. --------------------
+// val testName04 = "Scan_Test_04_One_Version"
+// // Get everything from HBase testing table into a Pipe
+// val hbase04 = CommonFunctors.fromBytesWritable(
+// new HBaseSource( tableName2, hbaseHost, 'key,
+// Array("data"),
+// Array('column1),
+// sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00",
+// versions = 1 ) // If versions is '0' - it is regarded as '1'
+// .read
+// , TABLE_01_SCHEMA)
+// .groupAll { group =>
+// group.toList[String]('key -> 'key)
+// group.toList[String]('column1 -> 'column1)
+// }
+// .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+// x._1 + " " + x._2
+// }
+//
+// // Calculate expected result for Test 04
+// var list04 = List(("",""))
+
+
+ // -------------------- Test 05 --------------------
+ val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions"
+ // Get everything from HBase testing table into a Pipe
+ val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"),
+ versions = 1 )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // Calculate expected result for Test 04
+ var list05 = List(("2000-01-01 11:00:00", "6"))
+
+ // -------------------- Test 6 --------------------
+ val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions"
+ val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"),
+ versions = 2 )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // Calculate expected result for Test 05
+ var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"))
+
+ // -------------------- Test 7 --------------------
+ val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions"
+ // Get everything from HBase testing table into a Pipe
+ val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"),
+ versions = 3 )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // Calculate expected result for Test 07
+ var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4"))
+
+ // -------------------- Test 08 --------------------
+ val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions"
+ // Get everything from HBase testing table into a Pipe
+ val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"),
+ versions = 4 )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"),
+ ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4"))
+
+ // -------------------- Test 09 --------------------
+ val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions"
+ // Get everything from HBase testing table into a Pipe
+ val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"),
+ versions = 4 )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4"))
+
+
+ // -------------------- Test 10 --------------------
+ val testName10 = "Get_Test_06_TestWith10000and1rowkeys"
+ var bigList1:List[String] = (1 to 10000).toList.map(_.toString)
+ var bigList2:List[String] = (100001 to 200000).toList.map(_.toString)
+ var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00")
+
+ // Get everything from HBase testing table into a Pipe
+ val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = bigList,
+ versions = 2 ) //
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+
+ var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),
+ ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5")
+ )
+
+ // -------------------- Test 11 --------------------
+ val testName11 = "Get_Test_07_EmptyList"
+ // Get everything from HBase testing table into a Pipe
+ val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List(),
+ versions = 1 ) //
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+
+ // -------------------- Test 11 --------------------
+ val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions"
+ // Get everything from HBase testing table into a Pipe
+ val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"),
+ versions = 1 )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ // --------------------- TEST 13 -----------------------------
+ val testName13 = "Some "
+ val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key,
+ Array("data"),
+ Array('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true )
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+
+ var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),
+ ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5")
+ )
+
+
+ // Store results of Scan Test 01
+ (
+ getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++
+ getTestResultPipe(getExpectedPipe(list02), hbase02, testName02) ++
+ getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++
+ assertPipeIsEmpty(hbase04, testName04) ++
+ getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++
+ getTestResultPipe(getExpectedPipe(list06), hbase06, testName06) ++
+ getTestResultPipe(getExpectedPipe(list07), hbase07, testName07) ++
+ getTestResultPipe(getExpectedPipe(list08), hbase08, testName08) ++
+ getTestResultPipe(getExpectedPipe(list09), hbase09, testName09) ++
+ getTestResultPipe(getExpectedPipe(list10), hbase10, testName10) ++
+ getTestResultPipe(getExpectedPipe(list13), hbase13, testName13) ++
+ assertPipeIsEmpty(hbase11, testName11) ++
+ assertPipeIsEmpty(hbase12, testName12)
+ ).groupAll { group =>
+ group.sortBy('testName)
+ }
+ .write(Tsv("HBaseShouldRead"))
+
+
+ /**
+ * We assume the pipe is empty
+ *
+ * We concatenate with a header - if the resulting size is 1
+ * then the original size was 0 - then the pipe was empty :)
+ *
+ * The result is then returned in a Pipe
+ */
+ def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = {
+ val headerPipe = IterableSource(List(testName), 'hbasedata)
+ val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group =>
+ group.size('size)
+ }
+ .project('size)
+
+ val result =
+ concatenation
+ .mapTo('size -> ('testName, 'result, 'expecteddata, 'hbasedata)) { x:String => {
+ if (x == "1") {
+ (testName, "Success", "", "")
+ } else {
+ (testName, "Test Failed", "", "")
+ }
+ }
+ }
+
+ result
+ }
+
+ /**
+ * Methods receives 2 pipes - and projects the results of testing
+ *
+ * expectedPipe should have a column 'expecteddata
+ * realHBasePipe should have a column 'hbasedata
+ */
+ def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = {
+ val results = expectedPipe.insert('testName , testName)
+ .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName))
+ .map(('expecteddata, 'hbasedata)->'result) { x:(String,String) =>
+ if (x._1.equals(x._2))
+ "Success"
+ else
+ "Test Failed"
+ }
+ .project('testName, 'result, 'expecteddata, 'hbasedata)
+ results
+ }
+
+ /**
+ *
+ */
+ def getExpectedPipe ( expectedList: List[(String,String)]) : Pipe = {
+
+ val expectedPipe =
+ IterableSource(expectedList, TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('*) -> 'expecteddata) { x:(String,String) =>
+ x._1 + " " + x._2
+ }
+ expectedPipe
+ }
+
+}
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala
new file mode 100644
index 0000000..aa77caa
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala
@@ -0,0 +1,10 @@
+package parallelai.spyglass.hbase.testing
+
+import parallelai.spyglass.base.JobRunner
+
+object HBaseSourceShouldReadRunner extends App {
+ val appConfig = "/projects/applications.conf"
+ val libPath = "/media/sf__CHANDAN_RAJAH_/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation"
+
+ JobRunner.main(Array(classOf[HBaseSourceShouldRead].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath))
+} \ No newline at end of file