aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase
diff options
context:
space:
mode:
authorChandan Rajah <chandan.rajah@gmail.com>2013-06-06 12:27:15 +0100
committerChandan Rajah <chandan.rajah@gmail.com>2013-06-06 12:27:15 +0100
commit6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch)
tree5254682e3c3440f7c6954b23519459107b8a445e /src/main/java/parallelai/spyglass/hbase
parentea9c80374da846edf2a1634a42ccb932838ebd5b (diff)
downloadSpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz
SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip
Added extensions to Read and Write mode.
Added support for key prefixes
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConstants.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java245
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java59
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java432
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java37
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseSalter.java206
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java17
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java45
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java90
9 files changed, 865 insertions, 269 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
index b546107..5baf20e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
@@ -15,5 +15,8 @@ public class HBaseConstants {
public static final String STOP_KEY = "hbase.%s.stopkey";
public static final String SOURCE_MODE = "hbase.%s.source.mode";
public static final String KEY_LIST = "hbase.%s.key.list";
+ public static final String VERSIONS = "hbase.%s.versions";
+ public static final String USE_SALT = "hbase.%s.use.salt";
+ public static final String SALT_PREFIX = "hbase.%s.salt.prefix";
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
index f1f4fb7..aabdc5e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
@@ -5,8 +5,8 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -17,8 +17,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter;
@@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
public class HBaseInputFormat
implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
@@ -48,9 +49,9 @@ public class HBaseInputFormat
private byte [][] inputColumns;
private HTable table;
- private HBaseRecordReader tableRecordReader;
+// private HBaseRecordReader tableRecordReader;
private Filter rowFilter;
- private String tableName = "";
+// private String tableName = "";
private HashMap<InetAddress, String> reverseDNSCacheMap =
new HashMap<InetAddress, String>();
@@ -83,7 +84,8 @@ public class HBaseInputFormat
List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
- .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY);
+ .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false);
+
splits.add(split);
return splits.toArray(new HBaseTableSplit[splits.size()]);
@@ -100,7 +102,7 @@ public class HBaseInputFormat
byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
byte[] maxKey = keys.getSecond()[0];
- LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
+ LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
byte [][] regStartKeys = keys.getFirst();
byte [][] regStopKeys = keys.getSecond();
@@ -144,29 +146,29 @@ public class HBaseInputFormat
regions[i] = regionLocation;
- LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));
+ LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));
}
byte[] startRow = HConstants.EMPTY_START_ROW;
byte[] stopRow = HConstants.EMPTY_END_ROW;
- LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
+ LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
- LOG.info("SOURCE MODE is : " + sourceMode);
+ LOG.debug("SOURCE MODE is : " + sourceMode);
switch( sourceMode ) {
case SCAN_ALL:
startRow = HConstants.EMPTY_START_ROW;
stopRow = HConstants.EMPTY_END_ROW;
- LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
+ LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
break;
case SCAN_RANGE:
startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;
stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ;
- LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
+ LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
break;
}
@@ -180,98 +182,110 @@ public class HBaseInputFormat
List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
- List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow);
-
- int maxRegions = validRegions.size();
- int currentRegion = 1;
-
- for( HRegionLocation cRegion : validRegions ) {
- byte [] rStart = cRegion.getRegionInfo().getStartKey();
- byte [] rStop = cRegion.getRegionInfo().getEndKey();
+ if( ! useSalt ) {
- HServerAddress regionServerAddress = cRegion.getServerAddress();
- InetAddress regionAddress =
- regionServerAddress.getInetSocketAddress().getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.error("Cannot resolve the host name for " + regionAddress +
- " because of " + e);
- regionLocation = regionServerAddress.getHostname();
- }
-
- byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow);
- byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);
-
- LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",
- (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )),
- (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )),
- rStop.length
- ));
+ List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow);
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(),
- sStart,
- sStop,
- regionLocation,
- SourceMode.SCAN_RANGE
- );
+ int maxRegions = validRegions.size();
+ int currentRegion = 1;
- split.setEndRowInclusive( currentRegion == maxRegions );
-
- currentRegion ++;
-
- LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
- Bytes.toString(startRow), Bytes.toString(stopRow),
- Bytes.toString(rStart), Bytes.toString(rStop),
- Bytes.toString(sStart),
- Bytes.toString(sStop),
- cRegion.getHostnamePort(), split) );
+ for( HRegionLocation cRegion : validRegions ) {
+ byte [] rStart = cRegion.getRegionInfo().getStartKey();
+ byte [] rStop = cRegion.getRegionInfo().getEndKey();
+
+ HServerAddress regionServerAddress = cRegion.getServerAddress();
+ InetAddress regionAddress =
+ regionServerAddress.getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress +
+ " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow);
+ byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);
+
+ LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",
+ (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )),
+ (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )),
+ rStop.length
+ ));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(),
+ sStart,
+ sStop,
+ regionLocation,
+ SourceMode.SCAN_RANGE, useSalt
+ );
+
+ split.setEndRowInclusive( currentRegion == maxRegions );
+
+ currentRegion ++;
+
+ LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
+ Bytes.toString(startRow), Bytes.toString(stopRow),
+ Bytes.toString(rStart), Bytes.toString(rStop),
+ Bytes.toString(sStart),
+ Bytes.toString(sStop),
+ cRegion.getHostnamePort(), split) );
+
+ splits.add(split);
+ }
+ } else {
+ LOG.debug("Using SALT : " + useSalt );
- splits.add(split);
+ // Will return the start and the stop key with all possible prefixes.
+ for( int i = 0; i < regions.length; i++ ) {
+ Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList );
+
+ for( Pair<byte[], byte[]> pair : intervals ) {
+ LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond())));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(),
+ pair.getFirst(),
+ pair.getSecond(),
+ regions[i],
+ SourceMode.SCAN_RANGE, useSalt
+ );
+
+ split.setEndRowInclusive(true);
+ splits.add(split);
+ }
+ }
}
-//
-// for (int i = 0; i < keys.getFirst().length; i++) {
-//
-// if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-// LOG.info("NOT including regions : " + regions[i]);
-// continue;
-// }
-//
-// // determine if the given start an stop key fall into the region
-// if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-// Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-// (stopRow.length == 0 ||
-// Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-//
-// byte[] splitStart = startRow.length == 0 ||
-// Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-// keys.getFirst()[i] : startRow;
-// byte[] splitStop = (stopRow.length == 0 ||
-// Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-// keys.getSecond()[i].length > 0 ?
-// keys.getSecond()[i] : stopRow;
-// HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
-// splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE);
-// splits.add(split);
-//
-// LOG.info("getSplits: split -> " + i + " -> " + split);
-// }
-// }
-
- LOG.info("RETURNED SPLITS: split -> " + splits);
+ LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size());
+ for( HBaseTableSplit s: splits) {
+ LOG.info("RETURNED SPLITS: split -> " + s);
+ }
return splits.toArray(new HBaseTableSplit[splits.size()]);
}
case GET_LIST:
{
- if( keyList == null || keyList.size() == 0 ) {
+// if( keyList == null || keyList.size() == 0 ) {
+ if( keyList == null ) {
throw new IOException("Source Mode is GET_LIST but key list is EMPTY");
}
+ if( useSalt ) {
+ TreeSet<String> tempKeyList = new TreeSet<String>();
+
+ for(String key: keyList) {
+ tempKeyList.add(HBaseSalter.addSaltPrefix(key));
+ }
+
+ keyList = tempKeyList;
+ }
+
+ LOG.debug("".format("Splitting Key List (%s)", keyList));
+
List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
for (int i = 0; i < keys.getFirst().length; i++) {
@@ -280,44 +294,46 @@ public class HBaseInputFormat
continue;
}
- LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));
+ LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));
Set<String> regionsSubSet = null;
if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) {
- LOG.info("REGION start is empty");
- LOG.info("REGION stop is empty");
+ LOG.debug("REGION start is empty");
+ LOG.debug("REGION stop is empty");
regionsSubSet = keyList;
} else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) {
- LOG.info("REGION start is empty");
+ LOG.debug("REGION start is empty");
regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);
} else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) {
- LOG.info("REGION stop is empty");
+ LOG.debug("REGION stop is empty");
regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);
} else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {
regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);
} else {
- throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",
+ throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",
regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
}
if( regionsSubSet == null || regionsSubSet.size() == 0) {
- LOG.info( "EMPTY: Key is for region " + regions[i] + " is null");
+ LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null");
continue;
}
TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet);
- LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));
+ LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));
HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(), regionKeyList,
+ table.getTableName(), regionKeyList, versions,
regions[i],
- SourceMode.GET_LIST);
+ SourceMode.GET_LIST, useSalt);
splits.add(split);
}
+ LOG.debug("RETURNED SPLITS: split -> " + splits);
+
return splits.toArray(new HBaseTableSplit[splits.size()]);
}
@@ -351,20 +367,23 @@ public class HBaseInputFormat
case SCAN_ALL:
case SCAN_RANGE:
{
- LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));
+ LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));
trr.setStartRow(tSplit.getStartRow());
trr.setEndRow(tSplit.getEndRow());
trr.setEndRowInclusive(tSplit.getEndRowInclusive());
+ trr.setUseSalt(useSalt);
}
break;
case GET_LIST:
{
- LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));
+ LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));
trr.setKeyList(tSplit.getKeyList());
+ trr.setVersions(tSplit.getVersions());
+ trr.setUseSalt(useSalt);
}
break;
@@ -402,6 +421,9 @@ public class HBaseInputFormat
private SourceMode sourceMode = SourceMode.EMPTY;
private TreeSet<String> keyList = null;
+ private int versions = 1;
+ private boolean useSalt = false;
+ private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
public void configure(JobConf job) {
String tableName = getTableName(job);
@@ -422,9 +444,12 @@ public class HBaseInputFormat
LOG.debug("Entered : " + this.getClass() + " : configure()" );
+ useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false);
+ prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST);
+
sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ;
- LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",
+ LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",
String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));
switch( sourceMode ) {
@@ -442,16 +467,18 @@ public class HBaseInputFormat
Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));
keyList = new TreeSet<String> (keys);
+
+ versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
- LOG.info( "GOT KEY LIST : " + keys );
- LOG.info(String.format("SETTING key list (%s)", keyList) );
+ LOG.debug( "GOT KEY LIST : " + keys );
+ LOG.debug(String.format("SETTING key list (%s)", keyList) );
break;
case EMPTY:
LOG.info("HIT EMPTY");
- sourceMode = sourceMode.SCAN_ALL;
+ sourceMode = SourceMode.SCAN_ALL;
break;
default:
@@ -468,7 +495,7 @@ public class HBaseInputFormat
if (tableName == null) {
throw new IOException("expecting one table name");
}
- LOG.debug("".format("Found Table name [%s]", tableName));
+ LOG.debug(String.format("Found Table name [%s]", tableName));
// connected to table?
@@ -476,16 +503,16 @@ public class HBaseInputFormat
throw new IOException("could not connect to table '" +
tableName + "'");
}
- LOG.debug("".format("Found Table [%s]", getHTable().getTableName()));
+ LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
// expecting at least one column
String colArg = job.get(COLUMN_LIST);
if (colArg == null || colArg.length() == 0) {
throw new IOException("expecting at least one column");
}
- LOG.debug("".format("Found Columns [%s]", colArg));
+ LOG.debug(String.format("Found Columns [%s]", colArg));
- LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey));
+ LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey));
if( sourceMode == SourceMode.EMPTY ) {
throw new IOException("SourceMode should not be EMPTY");
@@ -504,7 +531,7 @@ public class HBaseInputFormat
private void setJobProp( JobConf job, String key, String value) {
- if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));
+ if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));
job.set(key, value);
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
new file mode 100644
index 0000000..40f1faf
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -0,0 +1,59 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+public class HBaseOutputFormat extends
+FileOutputFormat<ImmutableBytesWritable, Put> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+ private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class);
+
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress) throws IOException {
+
+ // expecting exactly one path
+
+ String tableName = job.get(OUTPUT_TABLE);
+ HTable table = null;
+ try {
+ table = new HTable(HBaseConfiguration.create(job), tableName);
+ } catch(IOException e) {
+ LOG.error(e);
+ throw e;
+ }
+ table.setAutoFlush(false);
+ return new HBaseRecordWriter(table);
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if(tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
index 97077c4..d22ed71 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
@@ -3,12 +3,17 @@ package parallelai.spyglass.hbase;
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
import java.io.IOException;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
+import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -18,36 +23,38 @@ import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
-public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
+public class HBaseRecordReader implements
+ RecordReader<ImmutableBytesWritable, Result> {
static final Log LOG = LogFactory.getLog(HBaseRecordReader.class);
- private byte [] startRow;
- private byte [] endRow;
- private byte [] lastSuccessfulRow;
+ private byte[] startRow;
+ private byte[] endRow;
+ private byte[] lastSuccessfulRow;
private TreeSet<String> keyList;
private SourceMode sourceMode;
private Filter trrRowFilter;
private ResultScanner scanner;
private HTable htable;
- private byte [][] trrInputColumns;
+ private byte[][] trrInputColumns;
private long timestamp;
private int rowcount;
private boolean logScannerActivity = false;
private int logPerRowCount = 100;
private boolean endRowInclusive = true;
+ private int versions = 1;
+ private boolean useSalt = false;
/**
* Restart from survivable exceptions by creating a new scanner.
- *
+ *
* @param firstRow
* @throws IOException
*/
@@ -55,26 +62,26 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
Scan currentScan;
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
- Scan scan = new Scan(firstRow, (endRowInclusive ?
- Bytes.add(endRow, new byte[] {0}) : endRow ) );
-
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
+
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
scan.setCacheBlocks(false);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
} else {
- LOG.debug("TIFB.restart, firstRow: " +
- Bytes.toString(firstRow) + ", endRow: " +
- Bytes.toString(endRow));
- Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow ));
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
+ + ", endRow: " + Bytes.toString(endRow));
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
TableInputFormat.addColumns(scan, trrInputColumns);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
} else {
- LOG.debug("TIFB.restart, firstRow: " +
- Bytes.toStringBinary(firstRow) + ", no endRow");
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
+ + ", no endRow");
Scan scan = new Scan(firstRow);
TableInputFormat.addColumns(scan, trrInputColumns);
@@ -83,7 +90,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
currentScan = scan;
}
if (logScannerActivity) {
- LOG.info("Current scan=" + currentScan.toString());
+ LOG.debug("Current scan=" + currentScan.toString());
timestamp = System.currentTimeMillis();
rowcount = 0;
}
@@ -97,6 +104,14 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
this.keyList = keyList;
}
+ public void setVersions(int versions) {
+ this.versions = versions;
+ }
+
+ public void setUseSalt(boolean useSalt) {
+ this.useSalt = useSalt;
+ }
+
public SourceMode getSourceMode() {
return sourceMode;
}
@@ -108,76 +123,84 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
public byte[] getEndRow() {
return endRow;
}
-
+
public void setEndRowInclusive(boolean isInclusive) {
endRowInclusive = isInclusive;
}
-
+
public boolean getEndRowInclusive() {
return endRowInclusive;
}
-
- private byte [] nextKey = null;
+
+ private byte[] nextKey = null;
+ private Vector<List<KeyValue>> resultVector = null;
+ Map<Long, List<KeyValue>> keyValueMap = null;
/**
* Build the scanner. Not done in constructor to allow for extension.
- *
+ *
* @throws IOException
*/
public void init() throws IOException {
- switch( sourceMode ) {
- case SCAN_ALL:
- case SCAN_RANGE:
- restartRangeScan(startRow);
- break;
-
- case GET_LIST:
- nextKey = Bytes.toBytes(keyList.pollFirst());
- break;
-
- default:
- throw new IOException(" Unknown source mode : " + sourceMode );
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ restartRangeScan(startRow);
+ break;
+
+ case GET_LIST:
+ nextKey = Bytes.toBytes(keyList.pollFirst());
+ break;
+
+ default:
+ throw new IOException(" Unknown source mode : " + sourceMode);
}
}
byte[] getStartRow() {
return this.startRow;
}
+
/**
- * @param htable the {@link HTable} to scan.
+ * @param htable
+ * the {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
Configuration conf = htable.getConfiguration();
- logScannerActivity = conf.getBoolean(
- ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
+ false);
logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
this.htable = htable;
}
/**
- * @param inputColumns the columns to be placed in {@link Result}.
+ * @param inputColumns
+ * the columns to be placed in {@link Result}.
*/
- public void setInputColumns(final byte [][] inputColumns) {
+ public void setInputColumns(final byte[][] inputColumns) {
this.trrInputColumns = inputColumns;
}
/**
- * @param startRow the first row in the split
+ * @param startRow
+ * the first row in the split
*/
- public void setStartRow(final byte [] startRow) {
+ public void setStartRow(final byte[] startRow) {
this.startRow = startRow;
}
/**
- *
- * @param endRow the last row in the split
+ *
+ * @param endRow
+ * the last row in the split
*/
- public void setEndRow(final byte [] endRow) {
+ public void setEndRow(final byte[] endRow) {
this.endRow = endRow;
}
/**
- * @param rowFilter the {@link Filter} to be used.
+ * @param rowFilter
+ * the {@link Filter} to be used.
*/
public void setRowFilter(Filter rowFilter) {
this.trrRowFilter = rowFilter;
@@ -185,12 +208,13 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
@Override
public void close() {
- if (this.scanner != null) this.scanner.close();
+ if (this.scanner != null)
+ this.scanner.close();
}
/**
* @return ImmutableBytesWritable
- *
+ *
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
@Override
@@ -200,7 +224,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
/**
* @return RowResult
- *
+ *
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
@Override
@@ -222,104 +246,260 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
}
/**
- * @param key HStoreKey as input key.
- * @param value MapWritable as input value
+ * @param key
+ * HStoreKey as input key.
+ * @param value
+ * MapWritable as input value
* @return true if there was more data
* @throws IOException
*/
@Override
public boolean next(ImmutableBytesWritable key, Result value)
- throws IOException {
-
- switch(sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- {
-
- Result result;
+ throws IOException {
+
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+
+ Result result;
+ try {
try {
- try {
- result = this.scanner.next();
- if (logScannerActivity) {
- rowcount ++;
- if (rowcount >= logPerRowCount) {
- long now = System.currentTimeMillis();
- LOG.info("Mapper took " + (now-timestamp)
- + "ms to process " + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
- }
- } catch (IOException e) {
- // try to handle all IOExceptions by restarting
- // the scanner, if the second call fails, it will be rethrown
- LOG.debug("recovered from " + StringUtils.stringifyException(e));
- if (lastSuccessfulRow == null) {
- LOG.warn("We are restarting the first next() invocation," +
- " if your mapper has restarted a few other times like this" +
- " then you should consider killing this job and investigate" +
- " why it's taking so long.");
- }
- if (lastSuccessfulRow == null) {
- restartRangeScan(startRow);
- } else {
- restartRangeScan(lastSuccessfulRow);
- this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+ + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
}
- result = this.scanner.next();
}
+ } catch (IOException e) {
+ // try to handle all IOExceptions by restarting
+ // the scanner, if the second call fails, it will be rethrown
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation,"
+ + " if your mapper has restarted a few other times like this"
+ + " then you should consider killing this job and investigate"
+ + " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restartRangeScan(startRow);
+ } else {
+ restartRangeScan(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
+ }
- if (result != null && result.size() > 0) {
+ if (result != null && result.size() > 0) {
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
key.set(result.getRow());
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
- }
- return false;
- } catch (IOException ioe) {
- if (logScannerActivity) {
- long now = System.currentTimeMillis();
- LOG.info("Mapper took " + (now-timestamp)
- + "ms to process " + rowcount + " rows");
- LOG.info(ioe);
- String lastRow = lastSuccessfulRow == null ?
- "null" : Bytes.toStringBinary(lastSuccessfulRow);
- LOG.info("lastSuccessfulRow=" + lastRow);
}
- throw ioe;
+
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+ + rowcount + " rows");
+ LOG.debug(ioe);
+ String lastRow = lastSuccessfulRow == null ? "null" : Bytes
+ .toStringBinary(lastSuccessfulRow);
+ LOG.debug("lastSuccessfulRow=" + lastRow);
}
+ throw ioe;
}
+ }
+
+ case GET_LIST: {
+ LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey)));
- case GET_LIST:
- {
- Result result;
- if( nextKey != null ) {
- result = this.htable.get(new Get(nextKey));
+ if (versions == 1) {
+ if (nextKey != null) {
+ LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey)));
- if (result != null && result.size() > 0) {
- System.out.println("KeyList => " + keyList);
- System.out.println("Result => " + result);
- if (keyList != null || !keyList.isEmpty()) {
-
- String newKey = keyList.pollFirst();
- System.out.println("New Key => " + newKey);
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey);
- } else {
- nextKey = null;
- }
- key.set(result.getRow());
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result result = this.htable.get(theGet);
+
+ if (result != null && (! result.isEmpty()) ) {
+ LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) );
+
+ if (keyList != null || !keyList.isEmpty()) {
+ String newKey = keyList.pollFirst();
+ LOG.debug("New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+ } else {
+ nextKey = null;
+ }
+
+ LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey)));
+
+ // Write the result
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
+
return true;
+ } else {
+ LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0
+
+ String newKey;
+ while((newKey = keyList.pollFirst()) != null) {
+ LOG.debug("WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( nextKey == null ) {
+ LOG.error("BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if( ! this.htable.exists( new Get(nextKey) ) ) {
+ LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+ continue;
+ } else { break; }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ LOG.debug("Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
}
- return false;
} else {
+ // Nothig left. return false
return false;
}
+ } else {
+ if (resultVector != null && resultVector.size() != 0) {
+ LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) );
+
+ List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) );
+
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+
+ return true;
+ } else {
+ if (nextKey != null) {
+ LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey)));
+
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result resultAll = this.htable.get(theGet);
+
+ if( resultAll != null && (! resultAll.isEmpty())) {
+ List<KeyValue> keyValeList = resultAll.list();
+
+ keyValueMap = new HashMap<Long, List<KeyValue>>();
+
+ LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap));
+
+ for (KeyValue keyValue : keyValeList) {
+ long version = keyValue.getTimestamp();
+
+ if (keyValueMap.containsKey(new Long(version))) {
+ List<KeyValue> keyValueTempList = keyValueMap.get(new Long(
+ version));
+ if (keyValueTempList == null) {
+ keyValueTempList = new ArrayList<KeyValue>();
+ }
+ keyValueTempList.add(keyValue);
+ } else {
+ List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
+ keyValueMap.put(new Long(version), keyValueTempList);
+ keyValueTempList.add(keyValue);
+ }
+ }
+
+ resultVector = new Vector<List<KeyValue>>();
+ resultVector.addAll(keyValueMap.values());
+
+ List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) );
+
+ String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
+
+ System.out.println("+ New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ } else {
+ LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
+
+ String newKey;
+
+ while( (newKey = keyList.pollFirst()) != null ) {
+ LOG.debug("+ WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( nextKey == null ) {
+ LOG.error("+ BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if( ! this.htable.exists( new Get(nextKey) ) ) {
+ LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+ continue;
+ } else { break; }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ LOG.debug("+ Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
+ }
+
+ } else {
+ return false;
+ }
+ }
}
-
- default:
- throw new IOException("Unknown source mode : " + sourceMode );
- }
+ }
+ default:
+ throw new IOException("Unknown source mode : " + sourceMode);
+ }
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
new file mode 100644
index 0000000..1cca133
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
@@ -0,0 +1,37 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+ * and write to an HBase table
+ */
+public class HBaseRecordWriter
+ implements RecordWriter<ImmutableBytesWritable, Put> {
+ private HTable m_table;
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @param table
+ */
+ public HBaseRecordWriter(HTable table) {
+ m_table = table;
+ }
+
+ public void close(Reporter reporter)
+ throws IOException {
+ m_table.close();
+ }
+
+ public void write(ImmutableBytesWritable key,
+ Put value) throws IOException {
+ m_table.put(new Put(value));
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
new file mode 100644
index 0000000..fc656a4
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
@@ -0,0 +1,206 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+public class HBaseSalter {
+ private static final Log LOG = LogFactory.getLog(HBaseSalter.class);
+
+
+ public static byte [] addSaltPrefix(byte [] key) throws IOException {
+ if( key == null || key.length < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long");
+
+ String keyStr = Bytes.toString(key);
+
+ return Bytes.toBytes(keyStr.substring(keyStr.length() - 1) + "_" + keyStr);
+ }
+
+ public static String addSaltPrefix(String key) throws IOException {
+ if( key == null || key.length() < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long");
+
+ return (key.substring(key.length() - 1) + "_" + key);
+ }
+
+ public static ImmutableBytesWritable addSaltPrefix( ImmutableBytesWritable key ) throws IOException {
+ return new ImmutableBytesWritable( addSaltPrefix(key.get()));
+ }
+
+ public static byte [] delSaltPrefix(byte [] key) throws IOException {
+ if( key == null || key.length < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long");
+
+ String keyStr = Bytes.toString(key);
+
+ return Bytes.toBytes(keyStr.substring(2));
+ }
+
+ public static String delSaltPrefix(String key) throws IOException {
+ if( key == null || key.length() < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long");
+
+ return key.substring(2);
+ }
+
+ public static ImmutableBytesWritable delSaltPrefix( ImmutableBytesWritable key ) throws IOException {
+ return new ImmutableBytesWritable( delSaltPrefix(key.get()));
+ }
+
+ public static final String DEFAULT_PREFIX_LIST = " !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~";
+
+ public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey) throws IOException {
+ return getDistributedIntervals(originalStartKey, originalStopKey, DEFAULT_PREFIX_LIST);
+ }
+
+ public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey, String prefixList) throws IOException {
+ return getDistributedIntervals(originalStartKey, originalStopKey, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, prefixList);
+ }
+
+
+ public static Pair<byte[], byte[]>[] getDistributedIntervals(
+ byte[] originalStartKey, byte[] originalStopKey,
+ byte[] regionStartKey, byte[] regionStopKey,
+ String prefixList) throws IOException {
+ LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)",
+ Bytes.toString(originalStartKey),
+ Bytes.toString(originalStopKey),
+ Bytes.toString(regionStartKey),
+ Bytes.toString(regionStopKey),
+ prefixList
+ ));
+
+ byte[][] startKeys;
+ byte[][] stopKeys;
+
+ if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW)
+ && Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW) ) {
+ startKeys = getAllKeys(originalStartKey, prefixList);
+ stopKeys = getAllKeys(originalStopKey, prefixList);
+ } else if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW)) {
+ startKeys = getAllKeysWithStop(originalStartKey, prefixList, regionStopKey[0]);
+ stopKeys = getAllKeysWithStop(originalStopKey, prefixList, regionStopKey[0]);
+ } else if(Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW)) {
+ startKeys = getAllKeysWithStart(originalStartKey, prefixList, regionStartKey[0]);
+ stopKeys = getAllKeysWithStart(originalStopKey, prefixList, regionStartKey[0]);
+ } else {
+ startKeys = getAllKeysInRange(originalStartKey, prefixList, regionStartKey[0], regionStopKey[0]);
+ stopKeys = getAllKeysInRange(originalStopKey, prefixList, regionStartKey[0], regionStopKey[0]);
+ }
+
+ if( startKeys.length != stopKeys.length) {
+ throw new IOException("LENGTH of START Keys and STOP Keys DO NOT match");
+ }
+
+ if( Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)
+ && Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW) ) {
+ Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR);
+ // stop keys are the start key of the next interval
+ for (int i = startKeys.length - 1; i >= 1; i--) {
+ startKeys[i] = startKeys[i - 1];
+ }
+ startKeys[0] = HConstants.EMPTY_START_ROW;
+ stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW;
+ } else if (Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)) {
+ Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR);
+ // stop keys are the start key of the next interval
+ for (int i = startKeys.length - 1; i >= 1; i--) {
+ startKeys[i] = startKeys[i - 1];
+ }
+ startKeys[0] = HConstants.EMPTY_START_ROW;
+ } else if (Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW)) {
+ Arrays.sort(startKeys, Bytes.BYTES_RAWCOMPARATOR);
+ // stop keys are the start key of the next interval
+ for (int i = 0; i < stopKeys.length - 1; i++) {
+ stopKeys[i] = stopKeys[i + 1];
+ }
+ stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW;
+ }
+
+ Pair<byte[], byte[]>[] intervals = new Pair[startKeys.length];
+ for (int i = 0; i < startKeys.length; i++) {
+ intervals[i] = new Pair<byte[], byte[]>(startKeys[i], stopKeys[i]);
+ }
+
+ return intervals;
+ }
+
+ public static byte[][] getAllKeys(byte[] originalKey) throws IOException {
+ return getAllKeys(originalKey, DEFAULT_PREFIX_LIST);
+ }
+
+ public static byte[][] getAllKeys(byte[] originalKey, String prefixList) throws IOException {
+ char[] prefixArray = prefixList.toCharArray();
+
+ return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)prefixArray[prefixArray.length - 1]);
+ }
+
+ public static byte[][] getAllKeysWithStart(byte[] originalKey, String prefixList, byte startKey) throws IOException {
+ char[] prefixArray = prefixList.toCharArray();
+
+ return getAllKeysWithStartStop(originalKey, prefixList, startKey, (byte)prefixArray[prefixArray.length - 1]);
+ }
+
+ public static byte[][] getAllKeysWithStop(byte[] originalKey, String prefixList, byte stopKey) throws IOException {
+ char[] prefixArray = prefixList.toCharArray();
+
+ return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1));
+ }
+
+ public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) {
+ return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1));
+ }
+
+ private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) {
+ char[] prefixArray = prefixList.toCharArray();
+ TreeSet<Byte> prefixSet = new TreeSet<Byte>();
+
+ for( char c : prefixArray ) {
+ prefixSet.add((byte)c);
+ }
+
+ SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true);
+
+ return getAllKeys(originalKey, subSet.toArray(new Byte[]{}));
+ }
+
+ public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) {
+ byte[][] keys = new byte[prefixArray.length][];
+
+ for (byte i = 0; i < prefixArray.length; i++) {
+ keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey));
+ }
+
+ return keys;
+ }
+
+ public static void main(String [] args) throws IOException {
+
+ String s = "";
+
+ for (byte i = 32; i < Byte.MAX_VALUE; i++) {
+ s += (char)i;
+ }
+
+ System.out.println("".format("[%s]", s));
+ System.out.println("".format("[%s]", DEFAULT_PREFIX_LIST));
+
+
+ String start = Bytes.toString(HConstants.EMPTY_START_ROW);
+ String stop = Bytes.toString(HConstants.EMPTY_END_ROW);
+
+ System.out.println("".format("START: (%s) STOP: (%s)", start, stop));
+
+ Pair<byte[], byte[]>[] intervals = getDistributedIntervals(Bytes.toBytes(start), Bytes.toBytes(stop), "1423");
+
+ for( Pair<byte[], byte[]> p : intervals ) {
+ System.out.println("".format("iSTART: (%s) iSTOP: (%s)", Bytes.toString(p.getFirst()), Bytes.toString(p.getSecond())));
+ }
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index e5acc30..a7d36fd 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -24,7 +24,6 @@ import cascading.util.Util;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -63,7 +62,9 @@ public class HBaseScheme
/** String columns */
private transient String[] columns;
/** Field fields */
- private transient byte[][] fields;
+// private transient byte[][] fields;
+
+ private boolean useSalt = false;
/**
@@ -237,8 +238,13 @@ public class HBaseScheme
OutputCollector outputCollector = sinkCall.getOutput();
Tuple key = tupleEntry.selectTuple(keyField);
ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0);
+
+ if( useSalt ) {
+ keyBytes = HBaseSalter.addSaltPrefix(keyBytes);
+ }
+
Put put = new Put(keyBytes.get(), this.timeStamp);
-
+
for (int i = 0; i < valueFields.length; i++) {
Fields fieldSelector = valueFields[i];
TupleEntry values = tupleEntry.selectEntry(fieldSelector);
@@ -258,10 +264,13 @@ public class HBaseScheme
@Override
public void sinkConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
- conf.setOutputFormat(TableOutputFormat.class);
+ conf.setOutputFormat(HBaseOutputFormat.class);
conf.setOutputKeyClass(ImmutableBytesWritable.class);
conf.setOutputValueClass(Put.class);
+
+ String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE);
+ useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false);
}
@Override
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
index 1d48e1d..a5c3bdd 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
@@ -1,11 +1,9 @@
package parallelai.spyglass.hbase;
-import java.awt.event.KeyListener;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
@@ -14,8 +12,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.InputSplit;
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import com.sun.tools.javac.resources.version;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable {
@@ -28,11 +27,13 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
private TreeSet<String> m_keyList = null;
private SourceMode m_sourceMode = SourceMode.EMPTY;
private boolean m_endRowInclusive = true;
+ private int m_versions = 1;
+ private boolean m_useSalt = false;
/** default constructor */
public HBaseTableSplit() {
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY);
+ HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);
}
/**
@@ -43,19 +44,22 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
* @param location
*/
public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow,
- final String location, final SourceMode sourceMode) {
+ final String location, final SourceMode sourceMode, final boolean useSalt) {
this.m_tableName = tableName;
this.m_startRow = startRow;
this.m_endRow = endRow;
this.m_regionLocation = location;
this.m_sourceMode = sourceMode;
+ this.m_useSalt = useSalt;
}
- public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) {
+ public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) {
this.m_tableName = tableName;
this.m_keyList = keyList;
+ this.m_versions = versions;
this.m_sourceMode = sourceMode;
this.m_regionLocation = location;
+ this.m_useSalt = useSalt;
}
/** @return table name */
@@ -86,20 +90,28 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
return m_keyList;
}
+ public int getVersions() {
+ return m_versions;
+ }
+
/** @return get the source mode */
public SourceMode getSourceMode() {
return m_sourceMode;
}
+
+ public boolean getUseSalt() {
+ return m_useSalt;
+ }
/** @return the region's hostname */
public String getRegionLocation() {
- LOG.info("REGION GETTER : " + m_regionLocation);
+ LOG.debug("REGION GETTER : " + m_regionLocation);
return this.m_regionLocation;
}
public String[] getLocations() {
- LOG.info("REGION ARRAY : " + m_regionLocation);
+ LOG.debug("REGION ARRAY : " + m_regionLocation);
return new String[] {this.m_regionLocation};
}
@@ -112,11 +124,12 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
@Override
public void readFields(DataInput in) throws IOException {
- LOG.info("READ ME : " + in.toString());
+ LOG.debug("READ ME : " + in.toString());
this.m_tableName = Bytes.readByteArray(in);
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in)));
+ this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
switch(this.m_sourceMode) {
case SCAN_RANGE:
@@ -126,6 +139,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
break;
case GET_LIST:
+ this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
this.m_keyList = new TreeSet<String>();
int m = Bytes.toInt(Bytes.readByteArray(in));
@@ -136,16 +150,17 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
break;
}
- LOG.info("READ and CREATED : " + this);
+ LOG.debug("READ and CREATED : " + this);
}
@Override
public void write(DataOutput out) throws IOException {
- LOG.info("WRITE : " + this);
+ LOG.debug("WRITE : " + this);
Bytes.writeByteArray(out, this.m_tableName);
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
switch( this.m_sourceMode ) {
case SCAN_RANGE:
@@ -155,6 +170,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
break;
case GET_LIST:
+ Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
for( String k: this.m_keyList ) {
@@ -163,13 +179,14 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,
break;
}
- LOG.info("WROTE : " + out.toString());
+ LOG.debug("WROTE : " + out.toString());
}
@Override
public String toString() {
- return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)",
- Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList);
+ return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),
+ (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt);
}
@Override
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 9a0ed0e..65873a0 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,6 +12,7 @@
package parallelai.spyglass.hbase;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
@@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -35,13 +35,10 @@ import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map.Entry;
+import java.util.Arrays;
import java.util.UUID;
/**
@@ -172,7 +169,12 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
}
- conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ conf.set(HBaseOutputFormat.OUTPUT_TABLE, tableName);
+
+ for( SinkConfig sc : sinkConfigList) {
+ sc.configure(conf);
+ }
+
super.sinkConfInit(process, conf);
}
@@ -292,10 +294,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
public String startKey = null;
public String stopKey = null;
public String [] keyList = null;
+ public int versions = 1;
+ public boolean useSalt = false;
+ public String prefixList = null;
public void configure(Configuration jobConf) {
switch( sourceMode ) {
case SCAN_RANGE:
+ if (stopKey != null && startKey != null && startKey.compareTo(stopKey) > 0) {
+ String t = stopKey;
+ stopKey = startKey;
+ startKey = t;
+ }
+
jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
if( startKey != null && startKey.length() > 0 )
@@ -304,57 +315,104 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
if( stopKey != null && stopKey.length() > 0 )
jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey);
- LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
- LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey));
- LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));
+ // Added for Salting
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList);
+
+ LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ LOG.info(String.format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey));
+ LOG.info(String.format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));
break;
case GET_LIST:
jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList);
+ jobConf.setInt(String.format(HBaseConstants.VERSIONS, tableName), versions);
- LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
- LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList));
+ // Added for Salting
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList);
+
+ LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ LOG.info(String.format("Setting KEY LIST (%s) to key list length (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList.length));
break;
default:
jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
- LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ // Added for Salting
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList);
+
+ LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
break;
}
}
}
+ private static class SinkConfig implements Serializable {
+ public String tableName = null;
+ public boolean useSalt = false;
+
+ public void configure(Configuration jobConf) {
+ jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ }
+ }
+
private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>();
-
- public void setHBaseRangeParms(String startKey, String stopKey ) {
+ private ArrayList<SinkConfig> sinkConfigList = new ArrayList<SinkConfig>();
+
+ public void setHBaseRangeParms(String startKey, String stopKey, boolean useSalt, String prefixList ) {
SourceConfig sc = new SourceConfig();
sc.sourceMode = SourceMode.SCAN_RANGE;
sc.tableName = tableName;
sc.startKey = startKey;
sc.stopKey = stopKey;
+ sc.useSalt = useSalt;
+ setPrefixList(sc, prefixList);
sourceConfigList.add(sc);
}
- public void setHBaseListParms(String [] keyList ) {
+ public void setHBaseListParms(String [] keyList, int versions, boolean useSalt, String prefixList ) {
SourceConfig sc = new SourceConfig();
sc.sourceMode = SourceMode.GET_LIST;
sc.tableName = tableName;
sc.keyList = keyList;
+ sc.versions = (versions < 1) ? 1 : versions;
+ sc.useSalt = useSalt;
+ setPrefixList(sc, prefixList);
sourceConfigList.add(sc);
}
- public void setHBaseScanAllParms() {
+ public void setHBaseScanAllParms(boolean useSalt, String prefixList) {
SourceConfig sc = new SourceConfig();
sc.sourceMode = SourceMode.SCAN_ALL;
sc.tableName = tableName;
+ sc.useSalt = useSalt;
+
+ setPrefixList(sc, prefixList);
sourceConfigList.add(sc);
}
+
+ public void setUseSaltInSink( boolean useSalt ) {
+ SinkConfig sc = new SinkConfig();
+
+ sc.tableName = tableName;
+ sc.useSalt = useSalt;
+
+ sinkConfigList.add(sc);
+ }
+
+ private void setPrefixList(SourceConfig sc, String prefixList ) {
+ prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList;
+ char[] prefixArray = prefixList.toCharArray();
+ Arrays.sort(prefixArray);
+ sc.prefixList = new String( prefixArray );
+ }
}