diff options
6 files changed, 199 insertions, 82 deletions
| @@ -22,13 +22,13 @@  		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> -        <cdh.version>cdh4.2.0</cdh.version> +        <cdh.version>cdh4.3.0</cdh.version>          <datafu.version>0.0.4-${cdh.version}</datafu.version>          <flume.version>1.3.0-${cdh.version}</flume.version>          <hadoop.version>2.0.0-${cdh.version}</hadoop.version>          <hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version> -        <hbase.version>0.94.2-${cdh.version}</hbase.version> +        <hbase.version>0.94.6-${cdh.version}</hbase.version>          <hive.version>0.10.0-${cdh.version}</hive.version>          <mahout.version>0.7-${cdh.version}</mahout.version>          <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version> @@ -41,16 +41,16 @@          <zookeeper.version>3.4.5-${cdh.version}</zookeeper.version>  		<!-- Scala/Scalding/Cascading properties --> -        <scala.version>2.9.3</scala.version> -        <scalding.scala.version>2.9.3</scalding.scala.version> +        <scala.version>2.10.2</scala.version> +        <scalding.scala.version>2.10</scalding.scala.version>          <scalding.version>0.8.6</scalding.version> -		<cascading.version>2.1.0</cascading.version> +		<cascading.version>2.1.6</cascading.version>  		<scalding-commons.version>0.2.0</scalding-commons.version>  		<scalatest.version>1.9.1</scalatest.version>  		<trove4j.version>3.0.3</trove4j.version>  		<maple.version>0.2.8</maple.version> -		<specs2.version>1.12.4.1</specs2.version> +		<specs2.version>2.1.1</specs2.version>  		<typesafe.config.version>1.0.0</typesafe.config.version>          <!-- Other libraries properties --> @@ -144,8 +144,19 @@  	<dependencies> +        <dependency> +            <groupId>cascading</groupId> +            <artifactId>cascading-core</artifactId> +            <version>${cascading.version}</version> +        </dependency> +        <dependency> +            <groupId>cascading</groupId> +            <artifactId>cascading-hadoop</artifactId> +            <version>${cascading.version}</version> +        </dependency> + -		<!-- Hadoop --> +        <!-- Hadoop -->  		<dependency>  			<groupId>org.apache.hadoop</groupId>  			<artifactId>hadoop-core</artifactId> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index c145eb0..3c62f82 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -52,8 +52,8 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {        LOG.error(e);        throw e;      } -    // TODO: Should Autoflush be set to true ???? -    table.setAutoFlush(false); +    // TODO: Should Autoflush be set to true ???? - DONE +    table.setAutoFlush(true);      HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);      recordWriter.setSinkMode(sinkMode);      return recordWriter; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 832ce95..7b62c88 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto  		public void copyValue(Result oldValue, Result newValue) {  			if (null != oldValue && null != newValue) { -//				oldValue.copyFrom(newValue); +				oldValue.copyFrom(newValue);  			}  		} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index e3f5dc9..6766458 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -171,17 +171,28 @@ public class HBaseSalter {  	  }  	  SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); + +      LOG.info("".format("Prefix subset (%s)", subSet));  	  return getAllKeys(originalKey, subSet.toArray(new Byte[]{}));    }    public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { +    LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)", +              Bytes.toString(originalKey), prefixArray )); +  	byte[][] keys = new byte[prefixArray.length][];      for (byte i = 0; i < prefixArray.length; i++) {        keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey));      } +    for(int i = 0; i < keys.length; i ++) { +        for(int j = 0; j < keys[i].length; j++) { +            LOG.info("" + i + " : " + j + " : " + keys[i][j]); +        } +    } +      return keys;    } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index f774648..a4e2d7a 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -3,91 +3,161 @@ package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Args -import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.{IterableSource, Args, TextLine} +import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}  import cascading.tuple.Fields -import com.twitter.scalding.TextLine -import org.apache.log4j.Logger -import org.apache.log4j.Level -import parallelai.spyglass.hbase.HBasePipeConversions +import org.apache.log4j.{Logger, Level} +import cascading.tap.SinkMode +import cascading.pipe.Pipe +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes -class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { +class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions {    val isDebug = args.getOrElse("debug", "false").toBoolean -   +    if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } -   +    val TABLE_SCHEMA = List('key, 'salted, 'unsalted) -   +    val prefix = "0123456789" -     +    val quorum = args("quorum") -   -  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_ALL ).read -          .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanAllNoSalt01")) -  val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read        -        .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanAllPlusSalt01")) +  val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) -  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read      -        .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanRangeNoSalt01")) +  def toIBW(pipe: Pipe, f: Fields): Pipe = { +    asList(f) +      .foldLeft(pipe){ (p, f) => { +      p.map(f.toString -> f.toString){ from: String => +        Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) +      }} +    } +  } -  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read        -        .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanRangePlusSalt01")) -  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read -           -          .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/GetListNoSalt01")) +  val input = IterableSource(inVals, TABLE_SCHEMA) +    .read +    .write(TextLine("saltTesting/Inputs")) -  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read -           -          .fromBytesWritable(TABLE_SCHEMA ) -  .write(TextLine("saltTesting/GetListPlusSalt01")) +  val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA) +    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )) +} -  val hbase07 =  -      new HBaseSource( "_TEST.SALT.03", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) -  .read -  .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanRangePlusSalt10")) -  .toBytesWritable( TABLE_SCHEMA ) -  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,   +class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + +  val isDebug = args.getOrElse("debug", "false").toBoolean + +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + +  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + +  val prefix = "0123456789" + +  val quorum = args("quorum") + +  val sttKey = "01728" +  val stpKey = "01831" +  val sttKeyP = "8_01728" +  val stpKeyP = "1_01831" +  val listKey = List("01681", "01456") +  val listKeyP = List("1_01681", "6_01456") +   +//  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_ALL ).read +//          .fromBytesWritable( TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanAllNoSalt01")) + +  val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,            TABLE_SCHEMA.tail.map((x: Symbol) => "data"),            TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          useSalt = true )) +          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read +//        .fromBytesWritable( TABLE_SCHEMA ) +  .write(TextLine("saltTesting/ScanAllPlusSalt01")) -//  val hbase08 =  -//      new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) +//  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read +//        .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangeNoSalt01")) +// +//  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read +//        .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangePlusSalt01")) +// +//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read +//          .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListNoSalt01")) +// +//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read +//          .fromBytesWritable(TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListPlusSalt01")) +// +//  val hbase07 = +//      new HBaseSource( "_TEST.SALT.03", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) +//  .read +//  .fromBytesWritable( TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangePlusSalt10")) +//  .toBytesWritable( TABLE_SCHEMA ) +//  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          useSalt = true )) +// +//  val hbase08 = +//      new HBaseSource( "_TEST.SALT.01", quorum, 'key, +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )  //  .read  //  .fromBytesWritable('*)  //  .write(TextLine("saltTesting/ScanRangePlusSalt03")) +} + +class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { + +  val isDebug = args.getOrElse("debug", "false").toBoolean + +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + +  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + +  val prefix = "0123456789" + +  val quorum = args("quorum") + +  val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + +  def toIBW(pipe: Pipe, f: Fields): Pipe = { +    asList(f) +      .foldLeft(pipe){ (p, f) => { +      p.map(f.toString -> f.toString){ from: String => +        Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) +      }} +    } +  } + +  val input = IterableSource(inVals, TABLE_SCHEMA).read +  val eraser = toIBW(input, TABLE_SCHEMA) +    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, +      TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +      TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))  }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index e6744b7..a8de7d6 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -1,6 +1,7 @@  package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobRunner +import com.twitter.scalding.Args  object HBaseSaltTesterRunner extends App { @@ -18,12 +19,36 @@ object HBaseSaltTesterRunner extends App {    assert  (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"})    println( "Quorum is [%s]".format(quorum) ) +  val mArgs = Args(args) // get ("make-data") -  JobRunner.main(Array(classOf[HBaseSaltTester].getName,  -      "--hdfs",  -      "--app.conf.path", appPath,  +  val make = mArgs.getOrElse("make.data", "false").toBoolean +  val test = mArgs.getOrElse("test.data", "false").toBoolean +  val delete = mArgs.getOrElse("delete.data", "false").toBoolean + +  if( make ) { +    JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, +      "--hdfs", +      "--app.conf.path", appPath, +      "--job.lib.path", jobLibPath, +      "--quorum", quorum +    )) +  } + +  if( test ) { +    JobRunner.main(Array(classOf[HBaseSaltTester].getName, +        "--hdfs", +        "--app.conf.path", appPath, +        "--job.lib.path", jobLibPath, +        "--quorum", quorum +    )) +  } + +  if( delete ) { +    JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName, +      "--hdfs", +      "--app.conf.path", appPath,        "--job.lib.path", jobLibPath, -      "--quorum", quorum, -      "--debug", "true" -  )) +      "--quorum", quorum +    )) +  }  }
\ No newline at end of file | 
