aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTap.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java90
1 files changed, 74 insertions, 16 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 9a0ed0e..65873a0 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,6 +12,7 @@
package parallelai.spyglass.hbase;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
@@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -35,13 +35,10 @@ import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map.Entry;
+import java.util.Arrays;
import java.util.UUID;
/**
@@ -172,7 +169,12 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
}
- conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ conf.set(HBaseOutputFormat.OUTPUT_TABLE, tableName);
+
+ for( SinkConfig sc : sinkConfigList) {
+ sc.configure(conf);
+ }
+
super.sinkConfInit(process, conf);
}
@@ -292,10 +294,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
public String startKey = null;
public String stopKey = null;
public String [] keyList = null;
+ public int versions = 1;
+ public boolean useSalt = false;
+ public String prefixList = null;
public void configure(Configuration jobConf) {
switch( sourceMode ) {
case SCAN_RANGE:
+ if (stopKey != null && startKey != null && startKey.compareTo(stopKey) > 0) {
+ String t = stopKey;
+ stopKey = startKey;
+ startKey = t;
+ }
+
jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
if( startKey != null && startKey.length() > 0 )
@@ -304,57 +315,104 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
if( stopKey != null && stopKey.length() > 0 )
jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey);
- LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
- LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey));
- LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));
+ // Added for Salting
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList);
+
+ LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ LOG.info(String.format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey));
+ LOG.info(String.format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));
break;
case GET_LIST:
jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList);
+ jobConf.setInt(String.format(HBaseConstants.VERSIONS, tableName), versions);
- LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
- LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList));
+ // Added for Salting
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList);
+
+ LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ LOG.info(String.format("Setting KEY LIST (%s) to key list length (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList.length));
break;
default:
jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
- LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ // Added for Salting
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList);
+
+ LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
break;
}
}
}
+ private static class SinkConfig implements Serializable {
+ public String tableName = null;
+ public boolean useSalt = false;
+
+ public void configure(Configuration jobConf) {
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ }
+ }
+
private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>();
-
- public void setHBaseRangeParms(String startKey, String stopKey ) {
+ private ArrayList<SinkConfig> sinkConfigList = new ArrayList<SinkConfig>();
+
+ public void setHBaseRangeParms(String startKey, String stopKey, boolean useSalt, String prefixList ) {
SourceConfig sc = new SourceConfig();
sc.sourceMode = SourceMode.SCAN_RANGE;
sc.tableName = tableName;
sc.startKey = startKey;
sc.stopKey = stopKey;
+ sc.useSalt = useSalt;
+ setPrefixList(sc, prefixList);
sourceConfigList.add(sc);
}
- public void setHBaseListParms(String [] keyList ) {
+ public void setHBaseListParms(String [] keyList, int versions, boolean useSalt, String prefixList ) {
SourceConfig sc = new SourceConfig();
sc.sourceMode = SourceMode.GET_LIST;
sc.tableName = tableName;
sc.keyList = keyList;
+ sc.versions = (versions < 1) ? 1 : versions;
+ sc.useSalt = useSalt;
+ setPrefixList(sc, prefixList);
sourceConfigList.add(sc);
}
- public void setHBaseScanAllParms() {
+ public void setHBaseScanAllParms(boolean useSalt, String prefixList) {
SourceConfig sc = new SourceConfig();
sc.sourceMode = SourceMode.SCAN_ALL;
sc.tableName = tableName;
+ sc.useSalt = useSalt;
+
+ setPrefixList(sc, prefixList);
sourceConfigList.add(sc);
}
+
+ public void setUseSaltInSink( boolean useSalt ) {
+ SinkConfig sc = new SinkConfig();
+
+ sc.tableName = tableName;
+ sc.useSalt = useSalt;
+
+ sinkConfigList.add(sc);
+ }
+
+ private void setPrefixList(SourceConfig sc, String prefixList ) {
+ prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList;
+ char[] prefixArray = prefixList.toCharArray();
+ Arrays.sort(prefixArray);
+ sc.prefixList = new String( prefixArray );
+ }
}