aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-09-04 10:32:07 +0100
committerChandan Rajah <crajah@parallelai.com>2013-09-04 10:32:07 +0100
commit3501e241a2313cf49c371630cb6ebe0c3a47e991 (patch)
tree99b4e48c7590f94a4cbe8acf9ffbc036241ab737 /src/main/scala/parallelai/spyglass
parent147a423b345ea365c22af48727c83ea4f31b948c (diff)
downloadSpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.tar.gz
SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.zip
Extensive changes to the underlying code base.
Fully tested and working support for region level spliting Reduced number of mappers.
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala15
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala279
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala16
3 files changed, 235 insertions, 75 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 7ff7860..c214e99 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -10,7 +10,7 @@ import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
@@ -40,11 +40,14 @@ case class HBaseSource(
versions: Int = 1,
useSalt: Boolean = false,
prefixList: String = null,
- sinkMode: SinkMode = SinkMode.UPDATE
+ sinkMode: SinkMode = SinkMode.UPDATE,
+ inputSplitType: SplitType = SplitType.GRANULAR
) extends Source {
-
- override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
- .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
+ internalScheme.setInputSplitTye(inputSplitType)
+
+ override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
@@ -76,6 +79,8 @@ case class HBaseSource(
}
case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
}
+
+ hbt.setInputSplitType(inputSplitType)
hbt.asInstanceOf[Tap[_,_,_]]
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
index a4e2d7a..d75ff7b 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
@@ -1,9 +1,9 @@
package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
-import com.twitter.scalding.{IterableSource, Args, TextLine}
+import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine}
import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
import cascading.tuple.Fields
import org.apache.log4j.{Logger, Level}
@@ -59,76 +59,221 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val quorum = args("quorum")
val sttKey = "01728"
- val stpKey = "01831"
+ val stpKey = "03725"
val sttKeyP = "8_01728"
- val stpKeyP = "1_01831"
+ val stpKeyP = "5_03725"
val listKey = List("01681", "01456")
- val listKeyP = List("1_01681", "6_01456")
-
-// val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.SCAN_ALL ).read
-// .fromBytesWritable( TABLE_SCHEMA )
-// .write(TextLine("saltTesting/ScanAllNoSalt01"))
-
+ val listKeyP = List("0_01681", "6_01456")
+ val noSttKey = "9999990"
+ val noStpKey = "9999999"
+ val noSttKeyP = "9_9999990"
+ val noStpKeyP = "9_9999999"
+ val noListKey = List("0123456", "6543210")
+ val noListKeyP = List("6_0123456", "0_6543210")
+
+ val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR
+
+ val testName01 = "Scan All with NO useSalt"
+ val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.SCAN_ALL,
+ inputSplitType = splitType ).read
+ .fromBytesWritable( TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/ScanAllNoSalt01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName02 = "Scan All with useSalt=true"
val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
- sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read
-// .fromBytesWritable( TABLE_SCHEMA )
- .write(TextLine("saltTesting/ScanAllPlusSalt01"))
-
-// val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read
-// .fromBytesWritable(TABLE_SCHEMA )
-// .write(TextLine("saltTesting/ScanRangeNoSalt01"))
-//
-// val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read
-// .fromBytesWritable(TABLE_SCHEMA )
-// .write(TextLine("saltTesting/ScanRangePlusSalt01"))
-//
-// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read
-// .fromBytesWritable(TABLE_SCHEMA )
-// .write(TextLine("saltTesting/GetListNoSalt01"))
-//
-// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read
-// .fromBytesWritable(TABLE_SCHEMA )
-// .write(TextLine("saltTesting/GetListPlusSalt01"))
-//
-// val hbase07 =
-// new HBaseSource( "_TEST.SALT.03", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )
-// .read
-// .fromBytesWritable( TABLE_SCHEMA )
-// .write(TextLine("saltTesting/ScanRangePlusSalt10"))
-// .toBytesWritable( TABLE_SCHEMA )
-// .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// useSalt = true ))
-//
-// val hbase08 =
-// new HBaseSource( "_TEST.SALT.01", quorum, 'key,
-// TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
-// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
-// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )
-// .read
-// .fromBytesWritable('*)
-// .write(TextLine("saltTesting/ScanRangePlusSalt03"))
+ sourceMode = SourceMode.SCAN_ALL, useSalt = true,
+ inputSplitType = splitType).read
+ .fromBytesWritable( TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/ScanAllPlusSalt01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName03 = "Scan Range with NO useSalt"
+ val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/ScanRangePlusSalt01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName04 = "Scan Range with useSalt=true"
+ val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/ScanRangeNoSalt01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+
+ val testName05 = "Get List with NO useSalt"
+ val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/GetListPlusSalt01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName06 = "Get List with useSalt=true"
+ val hbase06 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.GET_LIST, keyList = listKeyP,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/GetListNoSalt01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName08 = "Scan Range NO RESULTS"
+ val hbase08 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKey, stopKey = noStpKey, useSalt = true, prefixList = prefix,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/ScanRangePlusSaltNoRes01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName09 = "Scan Range NO RESULT with useSalt=true"
+ val hbase09 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKeyP, stopKey = noStpKeyP,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/ScanRangeNoSaltNoRes01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+
+ val testName10 = "Get List NO RESULT"
+ val hbase10 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.GET_LIST, keyList = noListKey, useSalt = true,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/GetListPlusSaltNoRes01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+ val testName11 = "Get List NO RESULT with useSalt=true"
+ val hbase11 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
+ sourceMode = SourceMode.GET_LIST, keyList = noListKeyP,
+ inputSplitType = splitType).read
+ .fromBytesWritable(TABLE_SCHEMA )
+ .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('testData)
+ .write(TextLine("saltTesting/GetListNoSaltNoRes01"))
+ .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
+
+
+
+// (
+//// getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++
+//// getTestResultPipe(getExpectedPipe(list01), hbase02, testName02) ++
+// getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++
+// getTestResultPipe(getExpectedPipe(list03), hbase04, testName03) ++
+// getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++
+// getTestResultPipe(getExpectedPipe(list05), hbase06, testName06) ++
+// assertPipeIsEmpty(hbase08, testName08) ++
+// assertPipeIsEmpty(hbase09, testName09) ++
+// assertPipeIsEmpty(hbase10, testName10) ++
+// assertPipeIsEmpty(hbase11, testName11)
+// ).groupAll { group =>
+// group.sortBy('testName)
+// }
+// .write(Tsv("saltTesting/FinalTestResults"))
+
+ /**
+ * We assume the pipe is empty
+ *
+ * We concatenate with a header - if the resulting size is 1
+ * then the original size was 0 - then the pipe was empty :)
+ *
+ * The result is then returned in a Pipe
+ */
+ def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = {
+ val headerPipe = IterableSource(List(testName), 'testData)
+ val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group =>
+ group.size('size)
+ }
+ .project('size)
+
+ val result =
+ concatenation
+ .mapTo('size -> ('testName, 'result, 'expectedData, 'testData)) { x:String => {
+ if (x == "1") {
+ (testName, "Success", "", "")
+ } else {
+ (testName, "Test Failed", "", "")
+ }
+ }
+ }
+
+ result
+ }
+
+ /**
+ * Methods receives 2 pipes - and projects the results of testing
+ *
+ * expectedPipe should have a column 'expecteddata
+ * realHBasePipe should have a column 'hbasedata
+ */
+ def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = {
+ val results = expectedPipe.insert('testName , testName)
+ .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName))
+ .map(('expectedData, 'testData)->'result) { x:(String,String) =>
+ if (x._1.equals(x._2))
+ "Success"
+ else
+ "Test Failed"
+ }
+ .project('testName, 'result, 'expectedData, 'testData)
+ results
+ }
+
+ /**
+ *
+ */
+ def getExpectedPipe ( expectedList: List[(String,String, String)]) : Pipe = {
+ IterableSource(expectedList, TABLE_SCHEMA)
+ .map(('key, 'salted, 'unsalted) -> 'expectedData) {x: (String, String, String) => List(x._1, x._2, x._3)}
+ .project('expectedData)
+ .groupAll(group => group.toList[List[List[String]]]('expectedData -> 'expectedData))
+ }
+
}
class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions {
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
index a8de7d6..17bc873 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
@@ -2,6 +2,7 @@ package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobRunner
import com.twitter.scalding.Args
+import org.apache.log4j.{Level, Logger}
object HBaseSaltTesterRunner extends App {
@@ -25,12 +26,18 @@ object HBaseSaltTesterRunner extends App {
val test = mArgs.getOrElse("test.data", "false").toBoolean
val delete = mArgs.getOrElse("delete.data", "false").toBoolean
+ val isDebug = mArgs.getOrElse("debug", "false").toBoolean
+
+ if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }
+
+
if( make ) {
JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName,
"--hdfs",
"--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
- "--quorum", quorum
+ "--quorum", quorum,
+ "--debug", isDebug.toString
))
}
@@ -39,7 +46,9 @@ object HBaseSaltTesterRunner extends App {
"--hdfs",
"--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
- "--quorum", quorum
+ "--quorum", quorum,
+ "--debug", isDebug.toString,
+ "--regional", mArgs.getOrElse("regional", "false")
))
}
@@ -48,7 +57,8 @@ object HBaseSaltTesterRunner extends App {
"--hdfs",
"--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
- "--quorum", quorum
+ "--quorum", quorum,
+ "--debug", isDebug.toString
))
}
} \ No newline at end of file