aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java245
1 files changed, 136 insertions, 109 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
index f1f4fb7..aabdc5e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
@@ -5,8 +5,8 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -17,8 +17,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter;
@@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
public class HBaseInputFormat
implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
@@ -48,9 +49,9 @@ public class HBaseInputFormat
private byte [][] inputColumns;
private HTable table;
- private HBaseRecordReader tableRecordReader;
+// private HBaseRecordReader tableRecordReader;
private Filter rowFilter;
- private String tableName = "";
+// private String tableName = "";
private HashMap<InetAddress, String> reverseDNSCacheMap =
new HashMap<InetAddress, String>();
@@ -83,7 +84,8 @@ public class HBaseInputFormat
List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
- .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY);
+ .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false);
+
splits.add(split);
return splits.toArray(new HBaseTableSplit[splits.size()]);
@@ -100,7 +102,7 @@ public class HBaseInputFormat
byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
byte[] maxKey = keys.getSecond()[0];
- LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
+ LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
byte [][] regStartKeys = keys.getFirst();
byte [][] regStopKeys = keys.getSecond();
@@ -144,29 +146,29 @@ public class HBaseInputFormat
regions[i] = regionLocation;
- LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));
+ LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));
}
byte[] startRow = HConstants.EMPTY_START_ROW;
byte[] stopRow = HConstants.EMPTY_END_ROW;
- LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
+ LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
- LOG.info("SOURCE MODE is : " + sourceMode);
+ LOG.debug("SOURCE MODE is : " + sourceMode);
switch( sourceMode ) {
case SCAN_ALL:
startRow = HConstants.EMPTY_START_ROW;
stopRow = HConstants.EMPTY_END_ROW;
- LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
+ LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
break;
case SCAN_RANGE:
startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;
stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ;
- LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
+ LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
break;
}
@@ -180,98 +182,110 @@ public class HBaseInputFormat
List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
- List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow);
-
- int maxRegions = validRegions.size();
- int currentRegion = 1;
-
- for( HRegionLocation cRegion : validRegions ) {
- byte [] rStart = cRegion.getRegionInfo().getStartKey();
- byte [] rStop = cRegion.getRegionInfo().getEndKey();
+ if( ! useSalt ) {
- HServerAddress regionServerAddress = cRegion.getServerAddress();
- InetAddress regionAddress =
- regionServerAddress.getInetSocketAddress().getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.error("Cannot resolve the host name for " + regionAddress +
- " because of " + e);
- regionLocation = regionServerAddress.getHostname();
- }
-
- byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow);
- byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);
-
- LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",
- (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )),
- (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )),
- rStop.length
- ));
+ List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow);
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(),
- sStart,
- sStop,
- regionLocation,
- SourceMode.SCAN_RANGE
- );
+ int maxRegions = validRegions.size();
+ int currentRegion = 1;
- split.setEndRowInclusive( currentRegion == maxRegions );
-
- currentRegion ++;
-
- LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
- Bytes.toString(startRow), Bytes.toString(stopRow),
- Bytes.toString(rStart), Bytes.toString(rStop),
- Bytes.toString(sStart),
- Bytes.toString(sStop),
- cRegion.getHostnamePort(), split) );
+ for( HRegionLocation cRegion : validRegions ) {
+ byte [] rStart = cRegion.getRegionInfo().getStartKey();
+ byte [] rStop = cRegion.getRegionInfo().getEndKey();
+
+ HServerAddress regionServerAddress = cRegion.getServerAddress();
+ InetAddress regionAddress =
+ regionServerAddress.getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress +
+ " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow);
+ byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);
+
+ LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",
+ (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )),
+ (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )),
+ rStop.length
+ ));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(),
+ sStart,
+ sStop,
+ regionLocation,
+ SourceMode.SCAN_RANGE, useSalt
+ );
+
+ split.setEndRowInclusive( currentRegion == maxRegions );
+
+ currentRegion ++;
+
+ LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
+ Bytes.toString(startRow), Bytes.toString(stopRow),
+ Bytes.toString(rStart), Bytes.toString(rStop),
+ Bytes.toString(sStart),
+ Bytes.toString(sStop),
+ cRegion.getHostnamePort(), split) );
+
+ splits.add(split);
+ }
+ } else {
+ LOG.debug("Using SALT : " + useSalt );
- splits.add(split);
+ // Will return the start and the stop key with all possible prefixes.
+ for( int i = 0; i < regions.length; i++ ) {
+ Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList );
+
+ for( Pair<byte[], byte[]> pair : intervals ) {
+ LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond())));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(),
+ pair.getFirst(),
+ pair.getSecond(),
+ regions[i],
+ SourceMode.SCAN_RANGE, useSalt
+ );
+
+ split.setEndRowInclusive(true);
+ splits.add(split);
+ }
+ }
}
-//
-// for (int i = 0; i < keys.getFirst().length; i++) {
-//
-// if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-// LOG.info("NOT including regions : " + regions[i]);
-// continue;
-// }
-//
-// // determine if the given start an stop key fall into the region
-// if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-// Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-// (stopRow.length == 0 ||
-// Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-//
-// byte[] splitStart = startRow.length == 0 ||
-// Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-// keys.getFirst()[i] : startRow;
-// byte[] splitStop = (stopRow.length == 0 ||
-// Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-// keys.getSecond()[i].length > 0 ?
-// keys.getSecond()[i] : stopRow;
-// HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
-// splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE);
-// splits.add(split);
-//
-// LOG.info("getSplits: split -> " + i + " -> " + split);
-// }
-// }
-
- LOG.info("RETURNED SPLITS: split -> " + splits);
+ LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size());
+ for( HBaseTableSplit s: splits) {
+ LOG.info("RETURNED SPLITS: split -> " + s);
+ }
return splits.toArray(new HBaseTableSplit[splits.size()]);
}
case GET_LIST:
{
- if( keyList == null || keyList.size() == 0 ) {
+// if( keyList == null || keyList.size() == 0 ) {
+ if( keyList == null ) {
throw new IOException("Source Mode is GET_LIST but key list is EMPTY");
}
+ if( useSalt ) {
+ TreeSet<String> tempKeyList = new TreeSet<String>();
+
+ for(String key: keyList) {
+ tempKeyList.add(HBaseSalter.addSaltPrefix(key));
+ }
+
+ keyList = tempKeyList;
+ }
+
+ LOG.debug("".format("Splitting Key List (%s)", keyList));
+
List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
for (int i = 0; i < keys.getFirst().length; i++) {
@@ -280,44 +294,46 @@ public class HBaseInputFormat
continue;
}
- LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));
+ LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));
Set<String> regionsSubSet = null;
if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) {
- LOG.info("REGION start is empty");
- LOG.info("REGION stop is empty");
+ LOG.debug("REGION start is empty");
+ LOG.debug("REGION stop is empty");
regionsSubSet = keyList;
} else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) {
- LOG.info("REGION start is empty");
+ LOG.debug("REGION start is empty");
regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);
} else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) {
- LOG.info("REGION stop is empty");
+ LOG.debug("REGION stop is empty");
regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);
} else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {
regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);
} else {
- throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",
+ throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",
regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
}
if( regionsSubSet == null || regionsSubSet.size() == 0) {
- LOG.info( "EMPTY: Key is for region " + regions[i] + " is null");
+ LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null");
continue;
}
TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet);
- LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));
+ LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));
HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(), regionKeyList,
+ table.getTableName(), regionKeyList, versions,
regions[i],
- SourceMode.GET_LIST);
+ SourceMode.GET_LIST, useSalt);
splits.add(split);
}
+ LOG.debug("RETURNED SPLITS: split -> " + splits);
+
return splits.toArray(new HBaseTableSplit[splits.size()]);
}
@@ -351,20 +367,23 @@ public class HBaseInputFormat
case SCAN_ALL:
case SCAN_RANGE:
{
- LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));
+ LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));
trr.setStartRow(tSplit.getStartRow());
trr.setEndRow(tSplit.getEndRow());
trr.setEndRowInclusive(tSplit.getEndRowInclusive());
+ trr.setUseSalt(useSalt);
}
break;
case GET_LIST:
{
- LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));
+ LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));
trr.setKeyList(tSplit.getKeyList());
+ trr.setVersions(tSplit.getVersions());
+ trr.setUseSalt(useSalt);
}
break;
@@ -402,6 +421,9 @@ public class HBaseInputFormat
private SourceMode sourceMode = SourceMode.EMPTY;
private TreeSet<String> keyList = null;
+ private int versions = 1;
+ private boolean useSalt = false;
+ private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
public void configure(JobConf job) {
String tableName = getTableName(job);
@@ -422,9 +444,12 @@ public class HBaseInputFormat
LOG.debug("Entered : " + this.getClass() + " : configure()" );
+ useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false);
+ prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST);
+
sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ;
- LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",
+ LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",
String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));
switch( sourceMode ) {
@@ -442,16 +467,18 @@ public class HBaseInputFormat
Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));
keyList = new TreeSet<String> (keys);
+
+ versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
- LOG.info( "GOT KEY LIST : " + keys );
- LOG.info(String.format("SETTING key list (%s)", keyList) );
+ LOG.debug( "GOT KEY LIST : " + keys );
+ LOG.debug(String.format("SETTING key list (%s)", keyList) );
break;
case EMPTY:
LOG.info("HIT EMPTY");
- sourceMode = sourceMode.SCAN_ALL;
+ sourceMode = SourceMode.SCAN_ALL;
break;
default:
@@ -468,7 +495,7 @@ public class HBaseInputFormat
if (tableName == null) {
throw new IOException("expecting one table name");
}
- LOG.debug("".format("Found Table name [%s]", tableName));
+ LOG.debug(String.format("Found Table name [%s]", tableName));
// connected to table?
@@ -476,16 +503,16 @@ public class HBaseInputFormat
throw new IOException("could not connect to table '" +
tableName + "'");
}
- LOG.debug("".format("Found Table [%s]", getHTable().getTableName()));
+ LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
// expecting at least one column
String colArg = job.get(COLUMN_LIST);
if (colArg == null || colArg.length() == 0) {
throw new IOException("expecting at least one column");
}
- LOG.debug("".format("Found Columns [%s]", colArg));
+ LOG.debug(String.format("Found Columns [%s]", colArg));
- LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey));
+ LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey));
if( sourceMode == SourceMode.EMPTY ) {
throw new IOException("SourceMode should not be EMPTY");
@@ -504,7 +531,7 @@ public class HBaseInputFormat
private void setJobProp( JobConf job, String key, String value) {
- if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));
+ if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));
job.set(key, value);
}