diff options
Diffstat (limited to 'src/main/scala')
3 files changed, 235 insertions, 75 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 7ff7860..c214e99 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -10,7 +10,7 @@ import com.twitter.scalding.Read  import com.twitter.scalding.Source  import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}  import cascading.scheme.{NullScheme, Scheme}  import cascading.tap.SinkMode  import cascading.tap.Tap @@ -40,11 +40,14 @@ case class HBaseSource(      versions: Int = 1,      useSalt: Boolean = false,      prefixList: String = null, -    sinkMode: SinkMode = SinkMode.UPDATE +    sinkMode: SinkMode = SinkMode.UPDATE, +    inputSplitType: SplitType = SplitType.GRANULAR    ) extends Source { -     -  override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) -    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +  val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) +  internalScheme.setInputSplitTye(inputSplitType) + +  override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]    // To enable local mode testing    val allFields = keyFields.append(valueFields.toArray) @@ -76,6 +79,8 @@ case class HBaseSource(              }              case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))            } + +          hbt.setInputSplitType(inputSplitType)            hbt.asInstanceOf[Tap[_,_,_]]          } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index a4e2d7a..d75ff7b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -1,9 +1,9 @@  package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} -import com.twitter.scalding.{IterableSource, Args, TextLine} +import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine}  import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}  import cascading.tuple.Fields  import org.apache.log4j.{Logger, Level} @@ -59,76 +59,221 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio    val quorum = args("quorum")    val sttKey = "01728" -  val stpKey = "01831" +  val stpKey = "03725"    val sttKeyP = "8_01728" -  val stpKeyP = "1_01831" +  val stpKeyP = "5_03725"    val listKey = List("01681", "01456") -  val listKeyP = List("1_01681", "6_01456") -   -//  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_ALL ).read -//          .fromBytesWritable( TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanAllNoSalt01")) - +  val listKeyP = List("0_01681", "6_01456") +  val noSttKey = "9999990" +  val noStpKey = "9999999" +  val noSttKeyP = "9_9999990" +  val noStpKeyP = "9_9999999" +  val noListKey = List("0123456", "6543210") +  val noListKeyP = List("6_0123456", "0_6543210") + +  val splitType =  if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR + +  val testName01 = "Scan All with NO useSalt" +  val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) +  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.SCAN_ALL, +          inputSplitType = splitType ).read +          .fromBytesWritable( TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanAllNoSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName02 = "Scan All with useSalt=true"    val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,            TABLE_SCHEMA.tail.map((x: Symbol) => "data"),            TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read -//        .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanAllPlusSalt01")) - -//  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read -//        .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanRangeNoSalt01")) -// -//  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read -//        .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanRangePlusSalt01")) -// -//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read -//          .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/GetListNoSalt01")) -// -//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read -//          .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/GetListPlusSalt01")) -// -//  val hbase07 = -//      new HBaseSource( "_TEST.SALT.03", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -//  .read -//  .fromBytesWritable( TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanRangePlusSalt10")) -//  .toBytesWritable( TABLE_SCHEMA ) -//  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          useSalt = true )) -// -//  val hbase08 = -//      new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -//  .read -//  .fromBytesWritable('*) -//  .write(TextLine("saltTesting/ScanRangePlusSalt03")) +          sourceMode = SourceMode.SCAN_ALL, useSalt = true, +          inputSplitType = splitType).read +          .fromBytesWritable( TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanAllPlusSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName03 = "Scan Range with NO useSalt" +  val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) +  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanRangePlusSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName04 = "Scan Range with useSalt=true" +  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanRangeNoSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + +  val testName05 = "Get List with NO useSalt" +  val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) +  val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/GetListPlusSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName06 = "Get List with useSalt=true" +  val hbase06 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.GET_LIST, keyList = listKeyP, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/GetListNoSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName08 = "Scan Range NO RESULTS" +  val hbase08 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKey, stopKey = noStpKey, useSalt = true, prefixList = prefix, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/ScanRangePlusSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName09 = "Scan Range NO RESULT with useSalt=true" +  val hbase09 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKeyP, stopKey = noStpKeyP, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/ScanRangeNoSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + +  val testName10 = "Get List NO RESULT" +  val hbase10 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.GET_LIST, keyList = noListKey, useSalt = true, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/GetListPlusSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName11 = "Get List NO RESULT with useSalt=true" +  val hbase11 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.GET_LIST, keyList = noListKeyP, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/GetListNoSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + +//  ( +////      getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ +////      getTestResultPipe(getExpectedPipe(list01), hbase02, testName02) ++ +//      getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++ +//      getTestResultPipe(getExpectedPipe(list03), hbase04, testName03) ++ +//      getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++ +//      getTestResultPipe(getExpectedPipe(list05), hbase06, testName06) ++ +//      assertPipeIsEmpty(hbase08, testName08) ++ +//      assertPipeIsEmpty(hbase09, testName09) ++ +//      assertPipeIsEmpty(hbase10, testName10) ++ +//      assertPipeIsEmpty(hbase11, testName11) +//    ).groupAll { group => +//    group.sortBy('testName) +//  } +//    .write(Tsv("saltTesting/FinalTestResults")) + +  /** +   * We assume the pipe is empty +   * +   * We concatenate with a header - if the resulting size is 1 +   * then the original size was 0 - then the pipe was empty :) +   * +   * The result is then returned in a Pipe +   */ +  def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = { +    val headerPipe = IterableSource(List(testName), 'testData) +    val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => +      group.size('size) +    } +      .project('size) + +    val result = +      concatenation +        .mapTo('size -> ('testName, 'result, 'expectedData, 'testData)) { x:String => { +        if (x == "1") { +          (testName, "Success", "", "") +        } else { +          (testName, "Test Failed", "", "") +        } +      } +      } + +    result +  } + +  /** +   * Methods receives 2 pipes - and projects the results of testing +   * +   * expectedPipe  should have a column 'expecteddata +   * realHBasePipe should have a column 'hbasedata +   */ +  def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { +    val results = expectedPipe.insert('testName , testName) +      .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) +      .map(('expectedData, 'testData)->'result) { x:(String,String) => +      if (x._1.equals(x._2)) +        "Success" +      else +        "Test Failed" +    } +      .project('testName, 'result, 'expectedData, 'testData) +    results +  } + +  /** +   * +   */ +  def getExpectedPipe ( expectedList: List[(String,String, String)]) : Pipe = { +      IterableSource(expectedList, TABLE_SCHEMA) +        .map(('key, 'salted, 'unsalted) -> 'expectedData) {x: (String, String, String) => List(x._1, x._2, x._3)} +        .project('expectedData) +        .groupAll(group => group.toList[List[List[String]]]('expectedData -> 'expectedData)) +  } +  }  class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index a8de7d6..17bc873 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobRunner  import com.twitter.scalding.Args +import org.apache.log4j.{Level, Logger}  object HBaseSaltTesterRunner extends App { @@ -25,12 +26,18 @@ object HBaseSaltTesterRunner extends App {    val test = mArgs.getOrElse("test.data", "false").toBoolean    val delete = mArgs.getOrElse("delete.data", "false").toBoolean +  val isDebug = mArgs.getOrElse("debug", "false").toBoolean + +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + +    if( make ) {      JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName,        "--hdfs",        "--app.conf.path", appPath,        "--job.lib.path", jobLibPath, -      "--quorum", quorum +      "--quorum", quorum, +      "--debug", isDebug.toString      ))    } @@ -39,7 +46,9 @@ object HBaseSaltTesterRunner extends App {          "--hdfs",          "--app.conf.path", appPath,          "--job.lib.path", jobLibPath, -        "--quorum", quorum +        "--quorum", quorum, +        "--debug", isDebug.toString, +        "--regional", mArgs.getOrElse("regional", "false")      ))    } @@ -48,7 +57,8 @@ object HBaseSaltTesterRunner extends App {        "--hdfs",        "--app.conf.path", appPath,        "--job.lib.path", jobLibPath, -      "--quorum", quorum +      "--quorum", quorum, +      "--debug", isDebug.toString      ))    }  }
\ No newline at end of file | 
