aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala109
1 files changed, 65 insertions, 44 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
index d75ff7b..fb91f65 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
@@ -3,7 +3,7 @@ package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
-import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine}
+import com.twitter.scalding._
import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
import cascading.tuple.Fields
import org.apache.log4j.{Logger, Level}
@@ -11,6 +11,9 @@ import cascading.tap.SinkMode
import cascading.pipe.Pipe
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.hbase.HBaseSource
+import com.twitter.scalding.IterableSource
+import com.twitter.scalding.TextLine
class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions {
@@ -24,26 +27,37 @@ class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConver
val quorum = args("quorum")
- val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val stt = args("start.value").toInt
+ val stp = args("stop.value").toInt
- def toIBW(pipe: Pipe, f: Fields): Pipe = {
- asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String =>
- Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
- }}
+ loadRanges(stt, stp)
+
+ def loadRanges(stt: Int, stp: Int) {
+ def toIBW(pipe: Pipe, f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
}
- }
+ val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
- val input = IterableSource(inVals, TABLE_SCHEMA)
- .read
- .write(TextLine("saltTesting/Inputs"))
+// val input = IterableSource(inVals, TABLE_SCHEMA)
+// .read
+// .write(TextLine("saltTesting/Inputs"))
+//
+ val fromSource = new IterableSource(inVals, TABLE_SCHEMA).read.name("source_%s_%s".format(stt, stp))
- val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA)
- .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE ))
+ val toSource = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )
+
+ toIBW(fromSource, TABLE_SCHEMA)
+ .write(toSource)
+
+ }
}
class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
@@ -58,23 +72,23 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val quorum = args("quorum")
- val sttKey = "01728"
- val stpKey = "03725"
- val sttKeyP = "8_01728"
- val stpKeyP = "5_03725"
- val listKey = List("01681", "01456")
- val listKeyP = List("0_01681", "6_01456")
- val noSttKey = "9999990"
- val noStpKey = "9999999"
- val noSttKeyP = "9_9999990"
- val noStpKeyP = "9_9999999"
- val noListKey = List("0123456", "6543210")
- val noListKeyP = List("6_0123456", "0_6543210")
+ val sttKey = "0000001728"
+ val stpKey = "0000003725"
+ val sttKeyP = "8_0000001728"
+ val stpKeyP = "5_0000003725"
+ val listKey = List("0000001681", "0000001456")
+ val listKeyP = List("0_0000001681", "6_0000001456")
+ val noSttKey = "999999999990"
+ val noStpKey = "999999999999"
+ val noSttKeyP = "9_999999999990"
+ val noStpKeyP = "9_999999999999"
+ val noListKey = List("000000123456", "000006543210")
+ val noListKeyP = List("6_000000123456", "0_000006543210")
val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR
val testName01 = "Scan All with NO useSalt"
- val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
@@ -99,7 +113,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
.groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
val testName03 = "Scan Range with NO useSalt"
- val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
@@ -125,7 +139,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val testName05 = "Get List with NO useSalt"
- val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
@@ -288,21 +302,28 @@ class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeCon
val quorum = args("quorum")
- val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val stt = args("start.value").toInt
+ val stp = args("stop.value").toInt
+
+ delRanges(stt, stp)
- def toIBW(pipe: Pipe, f: Fields): Pipe = {
- asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String =>
- Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
- }}
+ def delRanges(stt: Int, stp: Int) {
+ val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
+
+ def toIBW(pipe: Pipe, f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
}
- }
- val input = IterableSource(inVals, TABLE_SCHEMA).read
+ val input = IterableSource(inVals, TABLE_SCHEMA).read
- val eraser = toIBW(input, TABLE_SCHEMA)
- .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
+ val eraser = toIBW(input, TABLE_SCHEMA)
+ .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
+ }
} \ No newline at end of file