aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml25
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java4
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseSalter.java11
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala202
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala37
6 files changed, 199 insertions, 82 deletions
diff --git a/pom.xml b/pom.xml
index fe692c6..9167fd4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,13 +22,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <cdh.version>cdh4.2.0</cdh.version>
+ <cdh.version>cdh4.3.0</cdh.version>
<datafu.version>0.0.4-${cdh.version}</datafu.version>
<flume.version>1.3.0-${cdh.version}</flume.version>
<hadoop.version>2.0.0-${cdh.version}</hadoop.version>
<hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version>
- <hbase.version>0.94.2-${cdh.version}</hbase.version>
+ <hbase.version>0.94.6-${cdh.version}</hbase.version>
<hive.version>0.10.0-${cdh.version}</hive.version>
<mahout.version>0.7-${cdh.version}</mahout.version>
<mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version>
@@ -41,16 +41,16 @@
<zookeeper.version>3.4.5-${cdh.version}</zookeeper.version>
<!-- Scala/Scalding/Cascading properties -->
- <scala.version>2.9.3</scala.version>
- <scalding.scala.version>2.9.3</scalding.scala.version>
+ <scala.version>2.10.2</scala.version>
+ <scalding.scala.version>2.10</scalding.scala.version>
<scalding.version>0.8.6</scalding.version>
- <cascading.version>2.1.0</cascading.version>
+ <cascading.version>2.1.6</cascading.version>
<scalding-commons.version>0.2.0</scalding-commons.version>
<scalatest.version>1.9.1</scalatest.version>
<trove4j.version>3.0.3</trove4j.version>
<maple.version>0.2.8</maple.version>
- <specs2.version>1.12.4.1</specs2.version>
+ <specs2.version>2.1.1</specs2.version>
<typesafe.config.version>1.0.0</typesafe.config.version>
<!-- Other libraries properties -->
@@ -144,8 +144,19 @@
<dependencies>
+ <dependency>
+ <groupId>cascading</groupId>
+ <artifactId>cascading-core</artifactId>
+ <version>${cascading.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>cascading</groupId>
+ <artifactId>cascading-hadoop</artifactId>
+ <version>${cascading.version}</version>
+ </dependency>
+
- <!-- Hadoop -->
+ <!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
index c145eb0..3c62f82 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -52,8 +52,8 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
LOG.error(e);
throw e;
}
- // TODO: Should Autoflush be set to true ????
- table.setAutoFlush(false);
+ // TODO: Should Autoflush be set to true ???? - DONE
+ table.setAutoFlush(true);
HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);
recordWriter.setSinkMode(sinkMode);
return recordWriter;
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
index 832ce95..7b62c88 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto
public void copyValue(Result oldValue, Result newValue) {
if (null != oldValue && null != newValue) {
-// oldValue.copyFrom(newValue);
+ oldValue.copyFrom(newValue);
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
index e3f5dc9..6766458 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
@@ -171,17 +171,28 @@ public class HBaseSalter {
}
SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true);
+
+ LOG.info("".format("Prefix subset (%s)", subSet));
return getAllKeys(originalKey, subSet.toArray(new Byte[]{}));
}
public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) {
+ LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)",
+ Bytes.toString(originalKey), prefixArray ));
+
byte[][] keys = new byte[prefixArray.length][];
for (byte i = 0; i < prefixArray.length; i++) {
keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey));
}
+ for(int i = 0; i < keys.length; i ++) {
+ for(int j = 0; j < keys[i].length; j++) {
+ LOG.info("" + i + " : " + j + " : " + keys[i][j]);
+ }
+ }
+
return keys;
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
index f774648..a4e2d7a 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
@@ -3,91 +3,161 @@ package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-import com.twitter.scalding.Args
-import parallelai.spyglass.hbase.HBaseSource
+import com.twitter.scalding.{IterableSource, Args, TextLine}
+import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
import cascading.tuple.Fields
-import com.twitter.scalding.TextLine
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-import parallelai.spyglass.hbase.HBasePipeConversions
+import org.apache.log4j.{Logger, Level}
+import cascading.tap.SinkMode
+import cascading.pipe.Pipe
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
-class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
+class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions {
val isDebug = args.getOrElse("debug", "false").toBoolean
-
+
if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }
-
+
val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
-
+
val prefix = "0123456789"
-
+
val quorum = args("quorum")
-
- val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.SCAN_ALL ).read
- .fromBytesWritable( TABLE_SCHEMA )
- .write(TextLine("saltTesting/ScanAllNoSalt01"))
- val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read
- .fromBytesWritable( TABLE_SCHEMA )
- .write(TextLine("saltTesting/ScanAllPlusSalt01"))
+ val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
- val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read
- .fromBytesWritable(TABLE_SCHEMA )
- .write(TextLine("saltTesting/ScanRangeNoSalt01"))
+ def toIBW(pipe: Pipe, f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
+ }
- val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read
- .fromBytesWritable(TABLE_SCHEMA )
- .write(TextLine("saltTesting/ScanRangePlusSalt01"))
- val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read
-
- .fromBytesWritable(TABLE_SCHEMA )
- .write(TextLine("saltTesting/GetListNoSalt01"))
+ val input = IterableSource(inVals, TABLE_SCHEMA)
+ .read
+ .write(TextLine("saltTesting/Inputs"))
- val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read
-
- .fromBytesWritable(TABLE_SCHEMA )
- .write(TextLine("saltTesting/GetListPlusSalt01"))
+ val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA)
+ .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE ))
+}
- val hbase07 =
- new HBaseSource( "_TEST.SALT.03", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix )
- .read
- .fromBytesWritable( TABLE_SCHEMA )
- .write(TextLine("saltTesting/ScanRangePlusSalt10"))
- .toBytesWritable( TABLE_SCHEMA )
- .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,
+class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val isDebug = args.getOrElse("debug", "false").toBoolean
+
+ if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }
+
+ val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
+
+ val prefix = "0123456789"
+
+ val quorum = args("quorum")
+
+ val sttKey = "01728"
+ val stpKey = "01831"
+ val sttKeyP = "8_01728"
+ val stpKeyP = "1_01831"
+ val listKey = List("01681", "01456")
+ val listKeyP = List("1_01681", "6_01456")
+
+// val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.SCAN_ALL ).read
+// .fromBytesWritable( TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanAllNoSalt01"))
+
+ val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- useSalt = true ))
+ sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read
+// .fromBytesWritable( TABLE_SCHEMA )
+ .write(TextLine("saltTesting/ScanAllPlusSalt01"))
-// val hbase08 =
-// new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
-// sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix )
+// val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read
+// .fromBytesWritable(TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanRangeNoSalt01"))
+//
+// val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read
+// .fromBytesWritable(TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanRangePlusSalt01"))
+//
+// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read
+// .fromBytesWritable(TABLE_SCHEMA )
+// .write(TextLine("saltTesting/GetListNoSalt01"))
+//
+// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read
+// .fromBytesWritable(TABLE_SCHEMA )
+// .write(TextLine("saltTesting/GetListPlusSalt01"))
+//
+// val hbase07 =
+// new HBaseSource( "_TEST.SALT.03", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )
+// .read
+// .fromBytesWritable( TABLE_SCHEMA )
+// .write(TextLine("saltTesting/ScanRangePlusSalt10"))
+// .toBytesWritable( TABLE_SCHEMA )
+// .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// useSalt = true ))
+//
+// val hbase08 =
+// new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )
// .read
// .fromBytesWritable('*)
// .write(TextLine("saltTesting/ScanRangePlusSalt03"))
+}
+
+class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val isDebug = args.getOrElse("debug", "false").toBoolean
+
+ if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }
+
+ val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
+
+ val prefix = "0123456789"
+
+ val quorum = args("quorum")
+
+ val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+
+ def toIBW(pipe: Pipe, f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
+ }
+
+ val input = IterableSource(inVals, TABLE_SCHEMA).read
+ val eraser = toIBW(input, TABLE_SCHEMA)
+ .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
index e6744b7..a8de7d6 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
@@ -1,6 +1,7 @@
package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobRunner
+import com.twitter.scalding.Args
object HBaseSaltTesterRunner extends App {
@@ -18,12 +19,36 @@ object HBaseSaltTesterRunner extends App {
assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"})
println( "Quorum is [%s]".format(quorum) )
+ val mArgs = Args(args) // get ("make-data")
- JobRunner.main(Array(classOf[HBaseSaltTester].getName,
- "--hdfs",
- "--app.conf.path", appPath,
+ val make = mArgs.getOrElse("make.data", "false").toBoolean
+ val test = mArgs.getOrElse("test.data", "false").toBoolean
+ val delete = mArgs.getOrElse("delete.data", "false").toBoolean
+
+ if( make ) {
+ JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName,
+ "--hdfs",
+ "--app.conf.path", appPath,
+ "--job.lib.path", jobLibPath,
+ "--quorum", quorum
+ ))
+ }
+
+ if( test ) {
+ JobRunner.main(Array(classOf[HBaseSaltTester].getName,
+ "--hdfs",
+ "--app.conf.path", appPath,
+ "--job.lib.path", jobLibPath,
+ "--quorum", quorum
+ ))
+ }
+
+ if( delete ) {
+ JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName,
+ "--hdfs",
+ "--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
- "--quorum", quorum,
- "--debug", "true"
- ))
+ "--quorum", quorum
+ ))
+ }
} \ No newline at end of file