diff options
6 files changed, 199 insertions, 82 deletions
@@ -22,13 +22,13 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <cdh.version>cdh4.2.0</cdh.version> + <cdh.version>cdh4.3.0</cdh.version> <datafu.version>0.0.4-${cdh.version}</datafu.version> <flume.version>1.3.0-${cdh.version}</flume.version> <hadoop.version>2.0.0-${cdh.version}</hadoop.version> <hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version> - <hbase.version>0.94.2-${cdh.version}</hbase.version> + <hbase.version>0.94.6-${cdh.version}</hbase.version> <hive.version>0.10.0-${cdh.version}</hive.version> <mahout.version>0.7-${cdh.version}</mahout.version> <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version> @@ -41,16 +41,16 @@ <zookeeper.version>3.4.5-${cdh.version}</zookeeper.version> <!-- Scala/Scalding/Cascading properties --> - <scala.version>2.9.3</scala.version> - <scalding.scala.version>2.9.3</scalding.scala.version> + <scala.version>2.10.2</scala.version> + <scalding.scala.version>2.10</scalding.scala.version> <scalding.version>0.8.6</scalding.version> - <cascading.version>2.1.0</cascading.version> + <cascading.version>2.1.6</cascading.version> <scalding-commons.version>0.2.0</scalding-commons.version> <scalatest.version>1.9.1</scalatest.version> <trove4j.version>3.0.3</trove4j.version> <maple.version>0.2.8</maple.version> - <specs2.version>1.12.4.1</specs2.version> + <specs2.version>2.1.1</specs2.version> <typesafe.config.version>1.0.0</typesafe.config.version> <!-- Other libraries properties --> @@ -144,8 +144,19 @@ <dependencies> + <dependency> + <groupId>cascading</groupId> + <artifactId>cascading-core</artifactId> + <version>${cascading.version}</version> + </dependency> + <dependency> + <groupId>cascading</groupId> + <artifactId>cascading-hadoop</artifactId> + <version>${cascading.version}</version> + </dependency> + - <!-- Hadoop --> + <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index c145eb0..3c62f82 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -52,8 +52,8 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { LOG.error(e); throw e; } - // TODO: Should Autoflush be set to true ???? - table.setAutoFlush(false); + // TODO: Should Autoflush be set to true ???? - DONE + table.setAutoFlush(true); HBaseRecordWriter recordWriter = new HBaseRecordWriter(table); recordWriter.setSinkMode(sinkMode); return recordWriter; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 832ce95..7b62c88 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto public void copyValue(Result oldValue, Result newValue) { if (null != oldValue && null != newValue) { -// oldValue.copyFrom(newValue); + oldValue.copyFrom(newValue); } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index e3f5dc9..6766458 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -171,17 +171,28 @@ public class HBaseSalter { } SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); + + LOG.info("".format("Prefix subset (%s)", subSet)); return getAllKeys(originalKey, subSet.toArray(new Byte[]{})); } public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { + LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)", + Bytes.toString(originalKey), prefixArray )); + byte[][] keys = new byte[prefixArray.length][]; for (byte i = 0; i < prefixArray.length; i++) { keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey)); } + for(int i = 0; i < keys.length; i ++) { + for(int j = 0; j < keys[i].length; j++) { + LOG.info("" + i + " : " + j + " : " + keys[i][j]); + } + } + return keys; } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index f774648..a4e2d7a 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -3,91 +3,161 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Args -import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.{IterableSource, Args, TextLine} +import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} import cascading.tuple.Fields -import com.twitter.scalding.TextLine -import org.apache.log4j.Logger -import org.apache.log4j.Level -import parallelai.spyglass.hbase.HBasePipeConversions +import org.apache.log4j.{Logger, Level} +import cascading.tap.SinkMode +import cascading.pipe.Pipe +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes -class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { +class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions { val isDebug = args.getOrElse("debug", "false").toBoolean - + if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } - + val TABLE_SCHEMA = List('key, 'salted, 'unsalted) - + val prefix = "0123456789" - + val quorum = args("quorum") - - val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_ALL ).read - .fromBytesWritable( TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanAllNoSalt01")) - val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read - .fromBytesWritable( TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanAllPlusSalt01")) + val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) - val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read - .fromBytesWritable(TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanRangeNoSalt01")) + def toIBW(pipe: Pipe, f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } + } - val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read - .fromBytesWritable(TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanRangePlusSalt01")) - val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read - - .fromBytesWritable(TABLE_SCHEMA ) - .write(TextLine("saltTesting/GetListNoSalt01")) + val input = IterableSource(inVals, TABLE_SCHEMA) + .read + .write(TextLine("saltTesting/Inputs")) - val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read - - .fromBytesWritable(TABLE_SCHEMA ) - .write(TextLine("saltTesting/GetListPlusSalt01")) + val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA) + .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )) +} - val hbase07 = - new HBaseSource( "_TEST.SALT.03", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) - .read - .fromBytesWritable( TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanRangePlusSalt10")) - .toBytesWritable( TABLE_SCHEMA ) - .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, +class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + + val isDebug = args.getOrElse("debug", "false").toBoolean + + if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + + val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + + val prefix = "0123456789" + + val quorum = args("quorum") + + val sttKey = "01728" + val stpKey = "01831" + val sttKeyP = "8_01728" + val stpKeyP = "1_01831" + val listKey = List("01681", "01456") + val listKeyP = List("1_01681", "6_01456") + +// val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.SCAN_ALL ).read +// .fromBytesWritable( TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanAllNoSalt01")) + + val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - useSalt = true )) + sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read +// .fromBytesWritable( TABLE_SCHEMA ) + .write(TextLine("saltTesting/ScanAllPlusSalt01")) -// val hbase08 = -// new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) +// val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read +// .fromBytesWritable(TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanRangeNoSalt01")) +// +// val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read +// .fromBytesWritable(TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanRangePlusSalt01")) +// +// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read +// .fromBytesWritable(TABLE_SCHEMA ) +// .write(TextLine("saltTesting/GetListNoSalt01")) +// +// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read +// .fromBytesWritable(TABLE_SCHEMA ) +// .write(TextLine("saltTesting/GetListPlusSalt01")) +// +// val hbase07 = +// new HBaseSource( "_TEST.SALT.03", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) +// .read +// .fromBytesWritable( TABLE_SCHEMA ) +// .write(TextLine("saltTesting/ScanRangePlusSalt10")) +// .toBytesWritable( TABLE_SCHEMA ) +// .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// useSalt = true )) +// +// val hbase08 = +// new HBaseSource( "_TEST.SALT.01", quorum, 'key, +// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) // .read // .fromBytesWritable('*) // .write(TextLine("saltTesting/ScanRangePlusSalt03")) +} + +class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { + + val isDebug = args.getOrElse("debug", "false").toBoolean + + if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + + val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + + val prefix = "0123456789" + + val quorum = args("quorum") + + val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + + def toIBW(pipe: Pipe, f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } + } + + val input = IterableSource(inVals, TABLE_SCHEMA).read + val eraser = toIBW(input, TABLE_SCHEMA) + .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index e6744b7..a8de7d6 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -1,6 +1,7 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobRunner +import com.twitter.scalding.Args object HBaseSaltTesterRunner extends App { @@ -18,12 +19,36 @@ object HBaseSaltTesterRunner extends App { assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) println( "Quorum is [%s]".format(quorum) ) + val mArgs = Args(args) // get ("make-data") - JobRunner.main(Array(classOf[HBaseSaltTester].getName, - "--hdfs", - "--app.conf.path", appPath, + val make = mArgs.getOrElse("make.data", "false").toBoolean + val test = mArgs.getOrElse("test.data", "false").toBoolean + val delete = mArgs.getOrElse("delete.data", "false").toBoolean + + if( make ) { + JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, + "--hdfs", + "--app.conf.path", appPath, + "--job.lib.path", jobLibPath, + "--quorum", quorum + )) + } + + if( test ) { + JobRunner.main(Array(classOf[HBaseSaltTester].getName, + "--hdfs", + "--app.conf.path", appPath, + "--job.lib.path", jobLibPath, + "--quorum", quorum + )) + } + + if( delete ) { + JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName, + "--hdfs", + "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum, - "--debug", "true" - )) + "--quorum", quorum + )) + } }
\ No newline at end of file |