diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala | 109 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala | 12 |
2 files changed, 71 insertions, 50 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index d75ff7b..fb91f65 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -3,7 +3,7 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} -import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine} +import com.twitter.scalding._ import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} import cascading.tuple.Fields import org.apache.log4j.{Logger, Level} @@ -11,6 +11,9 @@ import cascading.tap.SinkMode import cascading.pipe.Pipe import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.IterableSource +import com.twitter.scalding.TextLine class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions { @@ -24,26 +27,37 @@ class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConver val quorum = args("quorum") - val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val stt = args("start.value").toInt + val stp = args("stop.value").toInt - def toIBW(pipe: Pipe, f: Fields): Pipe = { - asList(f) - .foldLeft(pipe){ (p, f) => { - p.map(f.toString -> f.toString){ from: String => - Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) - }} + loadRanges(stt, stp) + + def loadRanges(stt: Int, stp: Int) { + def toIBW(pipe: Pipe, f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } } - } + val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) - val input = IterableSource(inVals, TABLE_SCHEMA) - .read - .write(TextLine("saltTesting/Inputs")) +// val input = IterableSource(inVals, TABLE_SCHEMA) +// .read +// .write(TextLine("saltTesting/Inputs")) +// + val fromSource = new IterableSource(inVals, TABLE_SCHEMA).read.name("source_%s_%s".format(stt, stp)) - val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA) - .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )) + val toSource = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE ) + + toIBW(fromSource, TABLE_SCHEMA) + .write(toSource) + + } } class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { @@ -58,23 +72,23 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val quorum = args("quorum") - val sttKey = "01728" - val stpKey = "03725" - val sttKeyP = "8_01728" - val stpKeyP = "5_03725" - val listKey = List("01681", "01456") - val listKeyP = List("0_01681", "6_01456") - val noSttKey = "9999990" - val noStpKey = "9999999" - val noSttKeyP = "9_9999990" - val noStpKeyP = "9_9999999" - val noListKey = List("0123456", "6543210") - val noListKeyP = List("6_0123456", "0_6543210") + val sttKey = "0000001728" + val stpKey = "0000003725" + val sttKeyP = "8_0000001728" + val stpKeyP = "5_0000003725" + val listKey = List("0000001681", "0000001456") + val listKeyP = List("0_0000001681", "6_0000001456") + val noSttKey = "999999999990" + val noStpKey = "999999999999" + val noSttKeyP = "9_999999999990" + val noStpKeyP = "9_999999999999" + val noListKey = List("000000123456", "000006543210") + val noListKeyP = List("6_000000123456", "0_000006543210") val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR val testName01 = "Scan All with NO useSalt" - val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), @@ -99,7 +113,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) val testName03 = "Scan Range with NO useSalt" - val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), @@ -125,7 +139,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val testName05 = "Get List with NO useSalt" - val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), @@ -288,21 +302,28 @@ class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeCon val quorum = args("quorum") - val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val stt = args("start.value").toInt + val stp = args("stop.value").toInt + + delRanges(stt, stp) - def toIBW(pipe: Pipe, f: Fields): Pipe = { - asList(f) - .foldLeft(pipe){ (p, f) => { - p.map(f.toString -> f.toString){ from: String => - Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) - }} + def delRanges(stt: Int, stp: Int) { + val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) + + def toIBW(pipe: Pipe, f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } } - } - val input = IterableSource(inVals, TABLE_SCHEMA).read + val input = IterableSource(inVals, TABLE_SCHEMA).read - val eraser = toIBW(input, TABLE_SCHEMA) - .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) + val eraser = toIBW(input, TABLE_SCHEMA) + .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) + } }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index 17bc873..8d5c4ec 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -32,33 +32,33 @@ object HBaseSaltTesterRunner extends App { if( make ) { - JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, + JobRunner.main((List(classOf[HBaseSaltTestSetup].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum, "--debug", isDebug.toString - )) + ) ::: mArgs.toList).toArray) } if( test ) { - JobRunner.main(Array(classOf[HBaseSaltTester].getName, + JobRunner.main((List(classOf[HBaseSaltTester].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum, "--debug", isDebug.toString, "--regional", mArgs.getOrElse("regional", "false") - )) + )::: mArgs.toList).toArray) } if( delete ) { - JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName, + JobRunner.main((List(classOf[HBaseSaltTestShutdown].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum, "--debug", isDebug.toString - )) + )::: mArgs.toList).toArray) } }
\ No newline at end of file |