diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala | 202 | ||||
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala | 37 | 
2 files changed, 167 insertions, 72 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index f774648..a4e2d7a 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -3,91 +3,161 @@ package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Args -import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.{IterableSource, Args, TextLine} +import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}  import cascading.tuple.Fields -import com.twitter.scalding.TextLine -import org.apache.log4j.Logger -import org.apache.log4j.Level -import parallelai.spyglass.hbase.HBasePipeConversions +import org.apache.log4j.{Logger, Level} +import cascading.tap.SinkMode +import cascading.pipe.Pipe +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes -class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { +class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions {    val isDebug = args.getOrElse("debug", "false").toBoolean -   +    if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } -   +    val TABLE_SCHEMA = List('key, 'salted, 'unsalted) -   +    val prefix = "0123456789" -     +    val quorum = args("quorum") -   -  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_ALL ).read -          .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanAllNoSalt01")) -  val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read        -        .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanAllPlusSalt01")) +  val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) -  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read      -        .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanRangeNoSalt01")) +  def toIBW(pipe: Pipe, f: Fields): Pipe = { +    asList(f) +      .foldLeft(pipe){ (p, f) => { +      p.map(f.toString -> f.toString){ from: String => +        Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) +      }} +    } +  } -  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read        -        .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanRangePlusSalt01")) -  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read -           -          .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/GetListNoSalt01")) +  val input = IterableSource(inVals, TABLE_SCHEMA) +    .read +    .write(TextLine("saltTesting/Inputs")) -  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read -           -          .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/GetListPlusSalt01")) +  val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA) +    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )) +} -  val hbase07 =  -      new HBaseSource( "_TEST.SALT.03", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) -  .read -  .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanRangePlusSalt10")) -  .toBytesWritable( TABLE_SCHEMA ) -  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,   +class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + +  val isDebug = args.getOrElse("debug", "false").toBoolean + +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + +  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + +  val prefix = "0123456789" + +  val quorum = args("quorum") + +  val sttKey = "01728" +  val stpKey = "01831" +  val sttKeyP = "8_01728" +  val stpKeyP = "1_01831" +  val listKey = List("01681", "01456") +  val listKeyP = List("1_01681", "6_01456") +   +//  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_ALL ).read +//          .fromBytesWritable( TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanAllNoSalt01")) + +  val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,            TABLE_SCHEMA.tail.map((x: Symbol) => "data"),            TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          useSalt = true )) +          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read +//        .fromBytesWritable( TABLE_SCHEMA ) +  .write(TextLine("saltTesting/ScanAllPlusSalt01")) -//  val hbase08 =  -//      new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) +//  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read +//        .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangeNoSalt01")) +// +//  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read +//        .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangePlusSalt01")) +// +//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read +//          .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListNoSalt01")) +// +//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read +//          .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListPlusSalt01")) +// +//  val hbase07 = +//      new HBaseSource( "_TEST.SALT.03", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) +//  .read +//  .fromBytesWritable( TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangePlusSalt10")) +//  .toBytesWritable( TABLE_SCHEMA ) +//  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          useSalt = true )) +// +//  val hbase08 = +//      new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )  //  .read  //  .fromBytesWritable('*)  //  .write(TextLine("saltTesting/ScanRangePlusSalt03")) +} + +class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { + +  val isDebug = args.getOrElse("debug", "false").toBoolean + +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + +  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + +  val prefix = "0123456789" + +  val quorum = args("quorum") + +  val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + +  def toIBW(pipe: Pipe, f: Fields): Pipe = { +    asList(f) +      .foldLeft(pipe){ (p, f) => { +      p.map(f.toString -> f.toString){ from: String => +        Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) +      }} +    } +  } + +  val input = IterableSource(inVals, TABLE_SCHEMA).read +  val eraser = toIBW(input, TABLE_SCHEMA) +    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, +      TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +      TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))  }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index e6744b7..a8de7d6 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -1,6 +1,7 @@  package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobRunner +import com.twitter.scalding.Args  object HBaseSaltTesterRunner extends App { @@ -18,12 +19,36 @@ object HBaseSaltTesterRunner extends App {    assert  (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"})    println( "Quorum is [%s]".format(quorum) ) +  val mArgs = Args(args) // get ("make-data") -  JobRunner.main(Array(classOf[HBaseSaltTester].getName,  -      "--hdfs",  -      "--app.conf.path", appPath,  +  val make = mArgs.getOrElse("make.data", "false").toBoolean +  val test = mArgs.getOrElse("test.data", "false").toBoolean +  val delete = mArgs.getOrElse("delete.data", "false").toBoolean + +  if( make ) { +    JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, +      "--hdfs", +      "--app.conf.path", appPath, +      "--job.lib.path", jobLibPath, +      "--quorum", quorum +    )) +  } + +  if( test ) { +    JobRunner.main(Array(classOf[HBaseSaltTester].getName, +        "--hdfs", +        "--app.conf.path", appPath, +        "--job.lib.path", jobLibPath, +        "--quorum", quorum +    )) +  } + +  if( delete ) { +    JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName, +      "--hdfs", +      "--app.conf.path", appPath,        "--job.lib.path", jobLibPath, -      "--quorum", quorum, -      "--debug", "true" -  )) +      "--quorum", quorum +    )) +  }  }
\ No newline at end of file | 
