aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSaad Rashid <saad373@gmail.com>2014-02-20 17:12:05 +0000
committerSaad Rashid <saad373@gmail.com>2014-02-20 17:12:05 +0000
commit3f2642dd59d17362aa9aa184efc866a5a8845a79 (patch)
tree4b22d8134204c9a6a50aa48394a5ab74798cab82
parent2e9f2981222b206249ce3f4c723ccae67e4bf062 (diff)
downloadSpyGlass-3f2642dd59d17362aa9aa184efc866a5a8845a79.tar.gz
SpyGlass-3f2642dd59d17362aa9aa184efc866a5a8845a79.zip
Fix test mode in HBaseSource.
change rc4 to rc1.
-rw-r--r--pom.xml2
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala35
2 files changed, 36 insertions, 1 deletions
diff --git a/pom.xml b/pom.xml
index 2e8884c..d702444 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>2.10.2_4.2rc4</version>
+ <version>2.10.2_4.2rc1</version>
<packaging>jar</packaging>
<!-- Repositories -->
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index bbae8f6..09ad19d 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -18,6 +18,8 @@ import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Source
+import com.twitter.scalding.TestMode
+import com.twitter.scalding.Test
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -92,6 +94,39 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
}
+ case testMode @ Test(_) => readOrWrite match {
+ case Read => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
+
+ sourceMode match {
+ case SourceMode.SCAN_RANGE => {
+
+ hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
+ }
+ case SourceMode.SCAN_ALL => {
+ hbt.setHBaseScanAllParms(useSalt, prefixList)
+ }
+ case SourceMode.GET_LIST => {
+ if( keyList == null )
+ throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
+
+ hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
+ }
+ case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
+ }
+
+ hbt.setInputSplitType(inputSplitType)
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
+
+ hbt.setUseSaltInSink(useSalt)
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
case _ => createEmptyTap(readOrWrite)(mode)
}
}