aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-09-04 10:32:07 +0100
committerChandan Rajah <crajah@parallelai.com>2013-09-04 10:32:07 +0100
commit3501e241a2313cf49c371630cb6ebe0c3a47e991 (patch)
tree99b4e48c7590f94a4cbe8acf9ffbc036241ab737 /src/main/java/parallelai/spyglass
parent147a423b345ea365c22af48727c83ea4f31b948c (diff)
downloadSpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.tar.gz
SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.zip
Extensive changes to the underlying code base.
Fully tested and working support for region level spliting Reduced number of mappers.
Diffstat (limited to 'src/main/java/parallelai/spyglass')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java53
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConstants.java37
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java645
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java172
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java (renamed from src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java)215
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java99
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java111
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java609
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java140
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java (renamed from src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java)117
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java124
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseSalter.java14
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java35
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java219
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java165
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java97
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java127
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java25
18 files changed, 1097 insertions, 1907 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
new file mode 100644
index 0000000..598a988
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
@@ -0,0 +1,53 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chand_000
+ * Date: 29/08/13
+ * Time: 17:25
+ * To change this template use File | Settings | File Templates.
+ */
+public class HBaseConfigUtils {
+ static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class);
+
+ public static void setRecordReaderParms(HBaseRecordReaderBase trr, HBaseTableSplitBase tSplit) throws IOException {
+ switch (tSplit.getSourceMode()) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+ LOG.debug(String.format(
+ "For split [%s] we have start key (%s) and stop key (%s)",
+ tSplit, tSplit.getStartRow(), tSplit.getEndRow()));
+
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setEndRowInclusive(tSplit.getEndRowInclusive());
+ trr.setUseSalt(tSplit.getUseSalt());
+ }
+
+ break;
+
+ case GET_LIST: {
+ LOG.debug(String.format("For split [%s] we have key list (%s)",
+ tSplit, tSplit.getKeyList()));
+
+ trr.setKeyList(tSplit.getKeyList());
+ trr.setVersions(tSplit.getVersions());
+ trr.setUseSalt(tSplit.getUseSalt());
+ }
+
+ break;
+
+ default:
+ throw new IOException("Unknown source mode : "
+ + tSplit.getSourceMode());
+ }
+
+ trr.setSourceMode(tSplit.getSourceMode());
+ }
+
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
index 25b89cb..5b5e9c3 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
@@ -3,21 +3,26 @@ package parallelai.spyglass.hbase;
import org.apache.hadoop.conf.Configuration;
public class HBaseConstants {
-
- public enum SourceMode {
- EMPTY,
- SCAN_ALL,
- SCAN_RANGE,
- GET_LIST;
- }
- public static final String START_KEY = "hbase.%s.startkey";
- public static final String STOP_KEY = "hbase.%s.stopkey";
- public static final String SOURCE_MODE = "hbase.%s.source.mode";
- public static final String KEY_LIST = "hbase.%s.key.list";
- public static final String VERSIONS = "hbase.%s.versions";
- public static final String USE_SALT = "hbase.%s.use.salt";
- public static final String SALT_PREFIX = "hbase.%s.salt.prefix";
+ public enum SourceMode {
+ EMPTY,
+ SCAN_ALL,
+ SCAN_RANGE,
+ GET_LIST;
+ }
- public static final String SINK_MODE = "hbase.%s.sink.mode";
-}
+ public enum SplitType {
+ GRANULAR,
+ REGIONAL;
+ }
+
+ public static final String START_KEY = "hbase.%s.startkey";
+ public static final String STOP_KEY = "hbase.%s.stopkey";
+ public static final String SOURCE_MODE = "hbase.%s.source.mode";
+ public static final String KEY_LIST = "hbase.%s.key.list";
+ public static final String VERSIONS = "hbase.%s.versions";
+ public static final String USE_SALT = "hbase.%s.use.salt";
+ public static final String SALT_PREFIX = "hbase.%s.salt.prefix";
+
+ public static final String SINK_MODE = "hbase.%s.sink.mode";
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
deleted file mode 100644
index 5a1184f..0000000
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
+++ /dev/null
@@ -1,645 +0,0 @@
-package parallelai.spyglass.hbase;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import javax.naming.NamingException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.StringUtils;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
-public class HBaseInputFormat implements
- InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
-
- private final Log LOG = LogFactory.getLog(HBaseInputFormat.class);
-
- private final String id = UUID.randomUUID().toString();
-
- private byte[][] inputColumns;
- private HTable table;
- // private HBaseRecordReader tableRecordReader;
- private Filter rowFilter;
- // private String tableName = "";
-
- private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();
-
- private String nameServer = null;
-
- // private Scan scan = null;
-
- private HBaseMultiInputSplit[] convertToMultiSplitArray(
- List<HBaseTableSplit> splits) throws IOException {
-
- if (splits == null)
- throw new IOException("The list of splits is null => " + splits);
-
- HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>();
-
- for (HBaseTableSplit hbt : splits) {
- HBaseMultiInputSplit mis = null;
- if (regionSplits.containsKey(hbt.getRegionLocation())) {
- mis = regionSplits.get(hbt.getRegionLocation());
- } else {
- regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit(
- hbt.getRegionLocation()));
- mis = regionSplits.get(hbt.getRegionLocation());
- }
-
- mis.addSplit(hbt);
- regionSplits.put(hbt.getRegionLocation(), mis);
- }
-
- Collection<HBaseMultiInputSplit> outVals = regionSplits.values();
-
- LOG.debug("".format("Returning array of splits : %s", outVals));
-
- if (outVals == null)
- throw new IOException("The list of multi input splits were null");
-
- return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- if (this.table == null) {
- throw new IOException("No table was provided");
- }
-
- if (this.inputColumns == null || this.inputColumns.length == 0) {
- throw new IOException("Expecting at least one column");
- }
-
- final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
-
- if (keys == null || keys.getFirst() == null
- || keys.getFirst().length == 0) {
- HRegionLocation regLoc = table.getRegionLocation(
- HConstants.EMPTY_BYTE_ARRAY, false);
-
- if (null == regLoc) {
- throw new IOException("Expecting at least one region.");
- }
-
- final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
- HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
- HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
- .getHostnamePort().split(
- Addressing.HOSTNAME_PORT_SEPARATOR)[0],
- SourceMode.EMPTY, false);
-
- splits.add(split);
-
- // TODO: Change to HBaseMultiSplit
- return convertToMultiSplitArray(splits);
- }
-
- if (keys.getSecond() == null || keys.getSecond().length == 0) {
- throw new IOException("Expecting at least one region.");
- }
-
- if (keys.getFirst().length != keys.getSecond().length) {
- throw new IOException("Regions for start and end key do not match");
- }
-
- byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
- byte[] maxKey = keys.getSecond()[0];
-
- LOG.debug(String.format("SETTING min key (%s) and max key (%s)",
- Bytes.toString(minKey), Bytes.toString(maxKey)));
-
- byte[][] regStartKeys = keys.getFirst();
- byte[][] regStopKeys = keys.getSecond();
- String[] regions = new String[regStartKeys.length];
-
- for (int i = 0; i < regStartKeys.length; i++) {
- minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0)
- && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i]
- : minKey;
- maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0)
- && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i]
- : maxKey;
-
- HServerAddress regionServerAddress = table.getRegionLocation(
- keys.getFirst()[i]).getServerAddress();
- InetAddress regionAddress = regionServerAddress.getInetSocketAddress()
- .getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.error("Cannot resolve the host name for " + regionAddress
- + " because of " + e);
- regionLocation = regionServerAddress.getHostname();
- }
-
- // HServerAddress regionServerAddress =
- // table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
- // InetAddress regionAddress =
- // regionServerAddress.getInetSocketAddress().getAddress();
- //
- // String regionLocation;
- //
- // try {
- // regionLocation = reverseDNS(regionAddress);
- // } catch (NamingException e) {
- // LOG.error("Cannot resolve the host name for " + regionAddress +
- // " because of " + e);
- // regionLocation = regionServerAddress.getHostname();
- // }
-
- // String regionLocation =
- // table.getRegionLocation(keys.getFirst()[i]).getHostname();
-
- LOG.debug("***** " + regionLocation);
-
- if (regionLocation == null || regionLocation.length() == 0)
- throw new IOException("The region info for regiosn " + i
- + " is null or empty");
-
- regions[i] = regionLocation;
-
- LOG.debug(String.format(
- "Region (%s) has start key (%s) and stop key (%s)", regions[i],
- Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
- }
-
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
-
- LOG.debug(String.format("Found min key (%s) and max key (%s)",
- Bytes.toString(minKey), Bytes.toString(maxKey)));
-
- LOG.debug("SOURCE MODE is : " + sourceMode);
-
- switch (sourceMode) {
- case SCAN_ALL:
- startRow = HConstants.EMPTY_START_ROW;
- stopRow = HConstants.EMPTY_END_ROW;
-
- LOG.info(String.format(
- "SCAN ALL: Found start key (%s) and stop key (%s)",
- Bytes.toString(startRow), Bytes.toString(stopRow)));
- break;
-
- case SCAN_RANGE:
- startRow = (startKey != null && startKey.length() != 0) ? Bytes
- .toBytes(startKey) : HConstants.EMPTY_START_ROW;
- stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes
- .toBytes(stopKey) : HConstants.EMPTY_END_ROW;
-
- LOG.info(String.format(
- "SCAN RANGE: Found start key (%s) and stop key (%s)",
- Bytes.toString(startRow), Bytes.toString(stopRow)));
- break;
- }
-
- switch (sourceMode) {
- case EMPTY:
- case SCAN_ALL:
- case SCAN_RANGE: {
- // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey :
- // startRow;
- // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey :
- // stopRow;
-
- final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
-
- if (!useSalt) {
-
- List<HRegionLocation> validRegions = table.getRegionsInRange(
- startRow, stopRow);
-
- int maxRegions = validRegions.size();
- int currentRegion = 1;
-
- for (HRegionLocation cRegion : validRegions) {
- byte[] rStart = cRegion.getRegionInfo().getStartKey();
- byte[] rStop = cRegion.getRegionInfo().getEndKey();
-
- HServerAddress regionServerAddress = cRegion
- .getServerAddress();
- InetAddress regionAddress = regionServerAddress
- .getInetSocketAddress().getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.error("Cannot resolve the host name for "
- + regionAddress + " because of " + e);
- regionLocation = regionServerAddress.getHostname();
- }
-
- byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
- || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart
- : startRow);
- byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW
- || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop
- : stopRow);
-
- LOG.debug(String.format(
- "BOOL start (%s) stop (%s) length (%d)",
- (startRow == HConstants.EMPTY_START_ROW || (Bytes
- .compareTo(startRow, rStart) <= 0)),
- (stopRow == HConstants.EMPTY_END_ROW || (Bytes
- .compareTo(stopRow, rStop) >= 0)), rStop.length));
-
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(), sStart, sStop, regionLocation,
- SourceMode.SCAN_RANGE, useSalt);
-
- split.setEndRowInclusive(currentRegion == maxRegions);
-
- currentRegion++;
-
- LOG.debug(String
- .format(
- "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
- Bytes.toString(startRow),
- Bytes.toString(stopRow), Bytes.toString(rStart),
- Bytes.toString(rStop), Bytes.toString(sStart),
- Bytes.toString(sStop), cRegion.getHostnamePort(),
- split));
-
- splits.add(split);
- }
- } else {
- LOG.debug("Using SALT : " + useSalt);
-
- // Will return the start and the stop key with all possible
- // prefixes.
- for (int i = 0; i < regions.length; i++) {
- Pair<byte[], byte[]>[] intervals = HBaseSalter
- .getDistributedIntervals(startRow, stopRow,
- regStartKeys[i], regStopKeys[i], prefixList);
-
- for (Pair<byte[], byte[]> pair : intervals) {
- LOG.debug("".format(
- "Using SALT, Region (%s) Start (%s) Stop (%s)",
- regions[i], Bytes.toString(pair.getFirst()),
- Bytes.toString(pair.getSecond())));
-
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(), pair.getFirst(),
- pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
- useSalt);
-
- split.setEndRowInclusive(true);
- splits.add(split);
- }
- }
- }
-
- LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size());
-
- // TODO: Change to HBaseMultiSplit
- return convertToMultiSplitArray(splits);
- }
-
- case GET_LIST: {
- // if( keyList == null || keyList.size() == 0 ) {
- if (keyList == null) {
- throw new IOException(
- "Source Mode is GET_LIST but key list is EMPTY");
- }
-
- if (useSalt) {
- TreeSet<String> tempKeyList = new TreeSet<String>();
-
- for (String key : keyList) {
- tempKeyList.add(HBaseSalter.addSaltPrefix(key));
- }
-
- keyList = tempKeyList;
- }
-
- LOG.info("".format("Splitting Key List (%s)", keyList));
-
- final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
-
- for (int i = 0; i < keys.getFirst().length; i++) {
-
- if (!includeRegionInSplit(keys.getFirst()[i],
- keys.getSecond()[i])) {
- continue;
- }
-
- LOG.debug(String.format(
- "Getting region (%s) subset (%s) to (%s)", regions[i],
- Bytes.toString(regStartKeys[i]),
- Bytes.toString(regStopKeys[i])));
-
- Set<String> regionsSubSet = null;
-
- if ((regStartKeys[i] == null || regStartKeys[i].length == 0)
- && (regStopKeys[i] == null || regStopKeys[i].length == 0)) {
- LOG.debug("REGION start is empty");
- LOG.debug("REGION stop is empty");
- regionsSubSet = keyList;
- } else if (regStartKeys[i] == null
- || regStartKeys[i].length == 0) {
- LOG.debug("REGION start is empty");
- regionsSubSet = keyList.headSet(
- Bytes.toString(regStopKeys[i]), true);
- } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) {
- LOG.debug("REGION stop is empty");
- regionsSubSet = keyList.tailSet(
- Bytes.toString(regStartKeys[i]), true);
- } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) {
- regionsSubSet = keyList.subSet(
- Bytes.toString(regStartKeys[i]), true,
- Bytes.toString(regStopKeys[i]), true);
- } else {
- throw new IOException(String.format(
- "For REGION (%s) Start Key (%s) > Stop Key(%s)",
- regions[i], Bytes.toString(regStartKeys[i]),
- Bytes.toString(regStopKeys[i])));
- }
-
- if (regionsSubSet == null || regionsSubSet.size() == 0) {
- LOG.debug("EMPTY: Key is for region " + regions[i]
- + " is null");
-
- continue;
- }
-
- TreeSet<String> regionKeyList = new TreeSet<String>(
- regionsSubSet);
-
- LOG.debug(String.format("Regions [%s] has key list <%s>",
- regions[i], regionKeyList));
-
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(), regionKeyList, versions, regions[i],
- SourceMode.GET_LIST, useSalt);
- splits.add(split);
- }
-
- // if (splits.isEmpty()) {
- // LOG.info("GOT EMPTY SPLITS");
-
- // throw new IOException(
- // "".format("Key List NOT found in any region"));
-
- // HRegionLocation regLoc = table.getRegionLocation(
- // HConstants.EMPTY_BYTE_ARRAY, false);
- //
- // if (null == regLoc) {
- // throw new IOException("Expecting at least one region.");
- // }
- //
- // HBaseTableSplit split = new HBaseTableSplit(
- // table.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
- // HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort()
- // .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0],
- // SourceMode.EMPTY, false);
- //
- // splits.add(split);
- // }
-
- LOG.info("RETURNED SPLITS: split -> " + splits);
-
- // TODO: Change to HBaseMultiSplit
- return convertToMultiSplitArray(splits);
- }
-
- default:
- throw new IOException("Unknown source Mode : " + sourceMode);
- }
- }
-
- private String reverseDNS(InetAddress ipAddress) throws NamingException {
- String hostName = this.reverseDNSCacheMap.get(ipAddress);
- if (hostName == null) {
- hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(
- ipAddress, this.nameServer));
- this.reverseDNSCacheMap.put(ipAddress, hostName);
- }
- return hostName;
- }
-
- @Override
- public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
-
- if (!(split instanceof HBaseMultiInputSplit))
- throw new IOException("Table Split is not type HBaseMultiInputSplit");
-
- HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split;
-
- HBaseRecordReader trr = new HBaseRecordReader(tSplit);
-
- trr.setHTable(this.table);
- trr.setInputColumns(this.inputColumns);
- trr.setRowFilter(this.rowFilter);
- trr.setUseSalt(useSalt);
-
- trr.setNextSplit();
-
- return trr;
- }
-
- /* Configuration Section */
-
- /**
- * space delimited list of columns
- */
- public static final String COLUMN_LIST = "hbase.tablecolumns";
-
- /**
- * Use this jobconf param to specify the input table
- */
- private static final String INPUT_TABLE = "hbase.inputtable";
-
- private String startKey = null;
- private String stopKey = null;
-
- private SourceMode sourceMode = SourceMode.EMPTY;
- private TreeSet<String> keyList = null;
- private int versions = 1;
- private boolean useSalt = false;
- private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
-
- public void configure(JobConf job) {
- String tableName = getTableName(job);
- String colArg = job.get(COLUMN_LIST);
- String[] colNames = colArg.split(" ");
- byte[][] m_cols = new byte[colNames.length][];
- for (int i = 0; i < m_cols.length; i++) {
- m_cols[i] = Bytes.toBytes(colNames[i]);
- }
- setInputColumns(m_cols);
-
- try {
- setHTable(new HTable(HBaseConfiguration.create(job), tableName));
- } catch (Exception e) {
- LOG.error("************* Table could not be created");
- LOG.error(StringUtils.stringifyException(e));
- }
-
- LOG.debug("Entered : " + this.getClass() + " : configure()");
-
- useSalt = job.getBoolean(
- String.format(HBaseConstants.USE_SALT, getTableName(job)), false);
- prefixList = job.get(
- String.format(HBaseConstants.SALT_PREFIX, getTableName(job)),
- HBaseSalter.DEFAULT_PREFIX_LIST);
-
- sourceMode = SourceMode.valueOf(job.get(String.format(
- HBaseConstants.SOURCE_MODE, getTableName(job))));
-
- LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
- .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
- .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
- sourceMode));
-
- switch (sourceMode) {
- case SCAN_RANGE:
- LOG.info("HIT SCAN_RANGE");
-
- startKey = getJobProp(job,
- String.format(HBaseConstants.START_KEY, getTableName(job)));
- stopKey = getJobProp(job,
- String.format(HBaseConstants.STOP_KEY, getTableName(job)));
-
- LOG.info(String.format("Setting start key (%s) and stop key (%s)",
- startKey, stopKey));
- break;
-
- case GET_LIST:
- LOG.info("HIT GET_LIST");
-
- Collection<String> keys = job.getStringCollection(String.format(
- HBaseConstants.KEY_LIST, getTableName(job)));
- keyList = new TreeSet<String>(keys);
-
- versions = job.getInt(
- String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
-
- LOG.debug("GOT KEY LIST : " + keys);
- LOG.debug(String.format("SETTING key list (%s)", keyList));
-
- break;
-
- case EMPTY:
- LOG.info("HIT EMPTY");
-
- sourceMode = SourceMode.SCAN_ALL;
- break;
-
- default:
- LOG.info("HIT DEFAULT");
-
- break;
- }
- }
-
- public void validateInput(JobConf job) throws IOException {
- // expecting exactly one path
- String tableName = getTableName(job);
-
- if (tableName == null) {
- throw new IOException("expecting one table name");
- }
- LOG.debug(String.format("Found Table name [%s]", tableName));
-
- // connected to table?
- if (getHTable() == null) {
- throw new IOException("could not connect to table '" + tableName + "'");
- }
- LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
-
- // expecting at least one column
- String colArg = job.get(COLUMN_LIST);
- if (colArg == null || colArg.length() == 0) {
- throw new IOException("expecting at least one column");
- }
- LOG.debug(String.format("Found Columns [%s]", colArg));
-
- LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey,
- stopKey));
-
- if (sourceMode == SourceMode.EMPTY) {
- throw new IOException("SourceMode should not be EMPTY");
- }
-
- if (sourceMode == SourceMode.GET_LIST
- && (keyList == null || keyList.size() == 0)) {
- throw new IOException("Source mode is GET_LIST bu key list is empty");
- }
- }
-
- /* Getters & Setters */
- private HTable getHTable() {
- return this.table;
- }
-
- private void setHTable(HTable ht) {
- this.table = ht;
- }
-
- private void setInputColumns(byte[][] ic) {
- this.inputColumns = ic;
- }
-
- private void setJobProp(JobConf job, String key, String value) {
- if (job.get(key) != null)
- throw new RuntimeException(String.format(
- "Job Conf already has key [%s] with value [%s]", key,
- job.get(key)));
- job.set(key, value);
- }
-
- private String getJobProp(JobConf job, String key) {
- return job.get(key);
- }
-
- public static void setTableName(JobConf job, String tableName) {
- // Make sure that table has not been set before
- String oldTableName = getTableName(job);
- if (oldTableName != null) {
- throw new RuntimeException("table name already set to: '"
- + oldTableName + "'");
- }
-
- job.set(INPUT_TABLE, tableName);
- }
-
- public static String getTableName(JobConf job) {
- return job.get(INPUT_TABLE);
- }
-
- protected boolean includeRegionInSplit(final byte[] startKey,
- final byte[] endKey) {
- return true;
- }
-}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
new file mode 100644
index 0000000..2f3047b
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
@@ -0,0 +1,172 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.Collection;
+import java.util.TreeSet;
+import java.util.UUID;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chand_000
+ * Date: 29/08/13
+ * Time: 12:43
+ * To change this template use File | Settings | File Templates.
+ */
+public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
+
+ private final Log LOG = LogFactory.getLog(HBaseInputFormatBase.class);
+
+ protected final String id = UUID.randomUUID().toString();
+ protected byte[][] inputColumns;
+ protected HTable table;
+ protected Filter rowFilter;
+
+ public static final String COLUMN_LIST = "hbase.tablecolumns";
+
+ /**
+ * Use this jobconf param to specify the input table
+ */
+ protected static final String INPUT_TABLE = "hbase.inputtable";
+
+ protected String startKey = null;
+ protected String stopKey = null;
+
+ protected HBaseConstants.SourceMode sourceMode = HBaseConstants.SourceMode.EMPTY;
+ protected TreeSet<String> keyList = null;
+ protected int versions = 1;
+ protected boolean useSalt = false;
+ protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
+
+
+
+ @Override
+ public void configure(JobConf job) {
+ String tableName = getTableName(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColumns(m_cols);
+
+ try {
+ setHTable(new HTable(HBaseConfiguration.create(job), tableName));
+ } catch (Exception e) {
+ LOG.error("************* Table could not be created");
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ LOG.debug("Entered : " + this.getClass() + " : configure()");
+
+ useSalt = job.getBoolean(
+ String.format(HBaseConstants.USE_SALT, getTableName(job)), false);
+ prefixList = job.get(
+ String.format(HBaseConstants.SALT_PREFIX, getTableName(job)),
+ HBaseSalter.DEFAULT_PREFIX_LIST);
+
+ sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format(
+ HBaseConstants.SOURCE_MODE, getTableName(job))));
+
+ LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
+ .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
+ .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
+ sourceMode));
+
+ switch (sourceMode) {
+ case SCAN_RANGE:
+ LOG.debug("HIT SCAN_RANGE");
+
+ startKey = getJobProp(job,
+ String.format(HBaseConstants.START_KEY, getTableName(job)));
+ stopKey = getJobProp(job,
+ String.format(HBaseConstants.STOP_KEY, getTableName(job)));
+
+ LOG.debug(String.format("Setting start key (%s) and stop key (%s)",
+ startKey, stopKey));
+ break;
+
+ case GET_LIST:
+ LOG.debug("HIT GET_LIST");
+
+ Collection<String> keys = job.getStringCollection(String.format(
+ HBaseConstants.KEY_LIST, getTableName(job)));
+ keyList = new TreeSet<String>(keys);
+
+ versions = job.getInt(
+ String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
+
+ LOG.debug("GOT KEY LIST : " + keys);
+ LOG.debug(String.format("SETTING key list (%s)", keyList));
+
+ break;
+
+ case EMPTY:
+ LOG.info("HIT EMPTY");
+
+ sourceMode = HBaseConstants.SourceMode.SCAN_ALL;
+ break;
+
+ default:
+ LOG.info("HIT DEFAULT");
+
+ break;
+ }
+ }
+
+ /* Getters & Setters */
+ protected HTable getHTable() {
+ return this.table;
+ }
+
+ protected void setHTable(HTable ht) {
+ this.table = ht;
+ }
+
+ protected void setInputColumns(byte[][] ic) {
+ this.inputColumns = ic;
+ }
+
+ protected void setJobProp(JobConf job, String key, String value) {
+ if (job.get(key) != null)
+ throw new RuntimeException(String.format(
+ "Job Conf already has key [%s] with value [%s]", key,
+ job.get(key)));
+ job.set(key, value);
+ }
+
+ protected String getJobProp(JobConf job, String key) {
+ return job.get(key);
+ }
+
+ public static void setTableName(JobConf job, String tableName) {
+ // Make sure that table has not been set before
+ String oldTableName = getTableName(job);
+ if (oldTableName != null) {
+ throw new RuntimeException("table name already set to: '"
+ + oldTableName + "'");
+ }
+
+ job.set(INPUT_TABLE, tableName);
+ }
+
+ public static String getTableName(JobConf job) {
+ return job.get(INPUT_TABLE);
+ }
+
+ protected void setParms(HBaseRecordReaderBase trr) {
+
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 96bfea1..929e9d8 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -37,17 +37,10 @@ import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-public class HBaseInputFormat_SINGLE implements
- InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
+public class HBaseInputFormatGranular extends HBaseInputFormatBase {
- private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class);
+ private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class);
- private final String id = UUID.randomUUID().toString();
-
- private byte[][] inputColumns;
- private HTable table;
- // private HBaseRecordReader tableRecordReader;
- private Filter rowFilter;
// private String tableName = "";
private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();
@@ -58,7 +51,7 @@ public class HBaseInputFormat_SINGLE implements
@SuppressWarnings("deprecation")
@Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException {
if (this.table == null) {
throw new IOException("No table was provided");
}
@@ -78,8 +71,8 @@ public class HBaseInputFormat_SINGLE implements
throw new IOException("Expecting at least one region.");
}
- List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
- HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
+ List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>(1);
+ HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(
Addressing.HOSTNAME_PORT_SEPARATOR)[0],
@@ -87,7 +80,7 @@ public class HBaseInputFormat_SINGLE implements
splits.add(split);
- return splits.toArray(new HBaseTableSplit[splits.size()]);
+ return splits.toArray(new HBaseTableSplitGranular[splits.size()]);
}
if (keys.getSecond() == null || keys.getSecond().length == 0) {
@@ -173,7 +166,7 @@ public class HBaseInputFormat_SINGLE implements
startRow = HConstants.EMPTY_START_ROW;
stopRow = HConstants.EMPTY_END_ROW;
- LOG.info(String.format(
+ LOG.debug(String.format(
"SCAN ALL: Found start key (%s) and stop key (%s)",
Bytes.toString(startRow), Bytes.toString(stopRow)));
break;
@@ -184,7 +177,7 @@ public class HBaseInputFormat_SINGLE implements
stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes
.toBytes(stopKey) : HConstants.EMPTY_END_ROW;
- LOG.info(String.format(
+ LOG.debug(String.format(
"SCAN RANGE: Found start key (%s) and stop key (%s)",
Bytes.toString(startRow), Bytes.toString(stopRow)));
break;
@@ -199,7 +192,7 @@ public class HBaseInputFormat_SINGLE implements
// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey :
// stopRow;
- List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+ List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>();
if (!useSalt) {
@@ -240,7 +233,7 @@ public class HBaseInputFormat_SINGLE implements
(stopRow == HConstants.EMPTY_END_ROW || (Bytes
.compareTo(stopRow, rStop) >= 0)), rStop.length));
- HBaseTableSplit split = new HBaseTableSplit(
+ HBaseTableSplitGranular split = new HBaseTableSplitGranular(
table.getTableName(), sStart, sStop, regionLocation,
SourceMode.SCAN_RANGE, useSalt);
@@ -270,12 +263,12 @@ public class HBaseInputFormat_SINGLE implements
regStartKeys[i], regStopKeys[i], prefixList);
for (Pair<byte[], byte[]> pair : intervals) {
- LOG.info("".format(
+ LOG.debug("".format(
"Using SALT, Region (%s) Start (%s) Stop (%s)",
regions[i], Bytes.toString(pair.getFirst()),
Bytes.toString(pair.getSecond())));
- HBaseTableSplit split = new HBaseTableSplit(
+ HBaseTableSplitGranular split = new HBaseTableSplitGranular(
table.getTableName(), pair.getFirst(),
pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
useSalt);
@@ -286,12 +279,12 @@ public class HBaseInputFormat_SINGLE implements
}
}
- LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size());
- for (HBaseTableSplit s : splits) {
- LOG.info("RETURNED SPLITS: split -> " + s);
+ LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size());
+ for (HBaseTableSplitGranular s : splits) {
+ LOG.debug("RETURNED SPLITS: split -> " + s);
}
- return splits.toArray(new HBaseTableSplit[splits.size()]);
+ return splits.toArray(new HBaseTableSplitGranular[splits.size()]);
}
case GET_LIST: {
@@ -313,7 +306,7 @@ public class HBaseInputFormat_SINGLE implements
LOG.debug("".format("Splitting Key List (%s)", keyList));
- List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+ List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>();
for (int i = 0; i < keys.getFirst().length; i++) {
@@ -325,7 +318,7 @@ public class HBaseInputFormat_SINGLE implements
LOG.debug(String.format(
"Getting region (%s) subset (%s) to (%s)", regions[i],
Bytes.toString(regStartKeys[i]),
- Bytes.toString(regStartKeys[i])));
+ Bytes.toString(regStopKeys[i])));
Set<String> regionsSubSet = null;
@@ -367,7 +360,7 @@ public class HBaseInputFormat_SINGLE implements
LOG.debug(String.format("Regions [%s] has key list <%s>",
regions[i], regionKeyList));
- HBaseTableSplit split = new HBaseTableSplit(
+ HBaseTableSplitGranular split = new HBaseTableSplitGranular(
table.getTableName(), regionKeyList, versions, regions[i],
SourceMode.GET_LIST, useSalt);
splits.add(split);
@@ -375,7 +368,7 @@ public class HBaseInputFormat_SINGLE implements
LOG.debug("RETURNED SPLITS: split -> " + splits);
- return splits.toArray(new HBaseTableSplit[splits.size()]);
+ return splits.toArray(new HBaseTableSplitGranular[splits.size()]);
}
default:
@@ -397,45 +390,17 @@ public class HBaseInputFormat_SINGLE implements
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
- if (!(split instanceof HBaseTableSplit))
- throw new IOException("Table Split is not type HBaseTableSplit");
-
- HBaseTableSplit tSplit = (HBaseTableSplit) split;
-
- HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE();
-
- switch (tSplit.getSourceMode()) {
- case SCAN_ALL:
- case SCAN_RANGE: {
- LOG.debug(String.format(
- "For split [%s] we have start key (%s) and stop key (%s)",
- tSplit, tSplit.getStartRow(), tSplit.getEndRow()));
-
- trr.setStartRow(tSplit.getStartRow());
- trr.setEndRow(tSplit.getEndRow());
- trr.setEndRowInclusive(tSplit.getEndRowInclusive());
- trr.setUseSalt(useSalt);
- }
+ LOG.info("GRANULAR SPLIT -> " + split);
- break;
+ if (!(split instanceof HBaseTableSplitGranular))
+ throw new IOException("Table Split is not type HBaseTableSplitGranular");
- case GET_LIST: {
- LOG.debug(String.format("For split [%s] we have key list (%s)",
- tSplit, tSplit.getKeyList()));
+ HBaseTableSplitGranular tSplit = (HBaseTableSplitGranular) split;
- trr.setKeyList(tSplit.getKeyList());
- trr.setVersions(tSplit.getVersions());
- trr.setUseSalt(useSalt);
- }
+ HBaseRecordReaderGranular trr = new HBaseRecordReaderGranular();
- break;
-
- default:
- throw new IOException("Unknown source mode : "
- + tSplit.getSourceMode());
- }
+ HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
- trr.setSourceMode(tSplit.getSourceMode());
trr.setHTable(this.table);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
@@ -450,95 +415,6 @@ public class HBaseInputFormat_SINGLE implements
/**
* space delimited list of columns
*/
- public static final String COLUMN_LIST = "hbase.tablecolumns";
-
- /**
- * Use this jobconf param to specify the input table
- */
- private static final String INPUT_TABLE = "hbase.inputtable";
-
- private String startKey = null;
- private String stopKey = null;
-
- private SourceMode sourceMode = SourceMode.EMPTY;
- private TreeSet<String> keyList = null;
- private int versions = 1;
- private boolean useSalt = false;
- private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
-
- public void configure(JobConf job) {
- String tableName = getTableName(job);
- String colArg = job.get(COLUMN_LIST);
- String[] colNames = colArg.split(" ");
- byte[][] m_cols = new byte[colNames.length][];
- for (int i = 0; i < m_cols.length; i++) {
- m_cols[i] = Bytes.toBytes(colNames[i]);
- }
- setInputColumns(m_cols);
-
- try {
- setHTable(new HTable(HBaseConfiguration.create(job), tableName));
- } catch (Exception e) {
- LOG.error("************* Table could not be created");
- LOG.error(StringUtils.stringifyException(e));
- }
-
- LOG.debug("Entered : " + this.getClass() + " : configure()");
-
- useSalt = job.getBoolean(
- String.format(HBaseConstants.USE_SALT, getTableName(job)), false);
- prefixList = job.get(
- String.format(HBaseConstants.SALT_PREFIX, getTableName(job)),
- HBaseSalter.DEFAULT_PREFIX_LIST);
-
- sourceMode = SourceMode.valueOf(job.get(String.format(
- HBaseConstants.SOURCE_MODE, getTableName(job))));
-
- LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
- .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
- .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
- sourceMode));
-
- switch (sourceMode) {
- case SCAN_RANGE:
- LOG.info("HIT SCAN_RANGE");
-
- startKey = getJobProp(job,
- String.format(HBaseConstants.START_KEY, getTableName(job)));
- stopKey = getJobProp(job,
- String.format(HBaseConstants.STOP_KEY, getTableName(job)));
-
- LOG.info(String.format("Setting start key (%s) and stop key (%s)",
- startKey, stopKey));
- break;
-
- case GET_LIST:
- LOG.info("HIT GET_LIST");
-
- Collection<String> keys = job.getStringCollection(String.format(
- HBaseConstants.KEY_LIST, getTableName(job)));
- keyList = new TreeSet<String>(keys);
-
- versions = job.getInt(
- String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
-
- LOG.debug("GOT KEY LIST : " + keys);
- LOG.debug(String.format("SETTING key list (%s)", keyList));
-
- break;
-
- case EMPTY:
- LOG.info("HIT EMPTY");
-
- sourceMode = SourceMode.SCAN_ALL;
- break;
-
- default:
- LOG.info("HIT DEFAULT");
-
- break;
- }
- }
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
@@ -575,45 +451,6 @@ public class HBaseInputFormat_SINGLE implements
}
}
- /* Getters & Setters */
- private HTable getHTable() {
- return this.table;
- }
-
- private void setHTable(HTable ht) {
- this.table = ht;
- }
-
- private void setInputColumns(byte[][] ic) {
- this.inputColumns = ic;
- }
-
- private void setJobProp(JobConf job, String key, String value) {
- if (job.get(key) != null)
- throw new RuntimeException(String.format(
- "Job Conf already has key [%s] with value [%s]", key,
- job.get(key)));
- job.set(key, value);
- }
-
- private String getJobProp(JobConf job, String key) {
- return job.get(key);
- }
-
- public static void setTableName(JobConf job, String tableName) {
- // Make sure that table has not been set before
- String oldTableName = getTableName(job);
- if (oldTableName != null) {
- throw new RuntimeException("table name already set to: '"
- + oldTableName + "'");
- }
-
- job.set(INPUT_TABLE, tableName);
- }
-
- public static String getTableName(JobConf job) {
- return job.get(INPUT_TABLE);
- }
protected boolean includeRegionInSplit(final byte[] startKey,
final byte[] endKey) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
new file mode 100644
index 0000000..eadb57e
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
@@ -0,0 +1,99 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chand_000
+ * Date: 29/08/13
+ * Time: 12:24
+ * To change this template use File | Settings | File Templates.
+ */
+public class HBaseInputFormatRegional extends HBaseInputFormatBase {
+ private HBaseInputFormatGranular granular = new HBaseInputFormatGranular();
+ private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class);
+
+
+ @Override
+ public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException {
+ granular.configure(job);
+ HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits);
+
+ HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits );
+
+ if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL");
+
+ LOG.info("GRANULAR => " + gSplits);
+ LOG.info("REGIONAL => " + splits);
+
+ return splits;
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
+ if (!(inputSplit instanceof HBaseTableSplitRegional))
+ throw new IOException("Table Split is not type HBaseTableSplitRegional");
+
+ LOG.info("REGIONAL SPLIT -> " + inputSplit);
+
+ HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit;
+
+ HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional();
+
+ HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
+
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+
+ trr.init(tSplit);
+
+ return trr;
+ }
+
+ private HBaseTableSplitRegional[] convertToMultiSplitArray(
+ HBaseTableSplitGranular[] splits) throws IOException {
+
+ if (splits == null)
+ throw new IOException("The list of splits is null => " + splits);
+
+ HashMap<String, HBaseTableSplitRegional> regionSplits = new HashMap<String, HBaseTableSplitRegional>();
+
+ for (HBaseTableSplitGranular hbt : splits) {
+ HBaseTableSplitRegional mis = null;
+ if (regionSplits.containsKey(hbt.getRegionLocation())) {
+ mis = regionSplits.get(hbt.getRegionLocation());
+ } else {
+ regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional(
+ hbt.getRegionLocation()));
+ mis = regionSplits.get(hbt.getRegionLocation());
+ }
+
+ mis.addSplit(hbt);
+ regionSplits.put(hbt.getRegionLocation(), mis);
+ }
+
+// for(String region : regionSplits.keySet() ) {
+// regionSplits.get(region)
+// }
+
+ Collection<HBaseTableSplitRegional> outVals = regionSplits.values();
+
+ LOG.debug("".format("Returning array of splits : %s", outVals));
+
+ if (outVals == null)
+ throw new IOException("The list of multi input splits were null");
+
+ return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]);
+ }
+
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java
deleted file mode 100644
index 02e7f7b..0000000
--- a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package parallelai.spyglass.hbase;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.InputSplit;
-
-public class HBaseMultiInputSplit implements InputSplit,
- Comparable<HBaseMultiInputSplit>, Serializable {
-
- private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class);
-
- private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
-
- private String regionLocation = null;
-
- /** default constructor */
- private HBaseMultiInputSplit() {
-
- }
-
- public HBaseMultiInputSplit(String regionLocation) {
- this.regionLocation = regionLocation;
- }
-
- /** @return the region's hostname */
- public String getRegionLocation() {
- LOG.debug("REGION GETTER : " + regionLocation);
-
- return this.regionLocation;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- LOG.debug("READ ME : " + in.toString());
-
- int s = Bytes.toInt(Bytes.readByteArray(in));
-
- for (int i = 0; i < s; i++) {
- HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils
- .deserialize(Bytes.readByteArray(in));
- splits.add(hbts);
- }
-
- LOG.debug("READ and CREATED : " + this);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- LOG.debug("WRITE : " + this);
-
- Bytes.writeByteArray(out, Bytes.toBytes(splits.size()));
-
- for (HBaseTableSplit hbts : splits) {
- Bytes.writeByteArray(out, SerializationUtils.serialize(hbts));
- }
-
- LOG.debug("WROTE : " + out.toString());
- }
-
- @Override
- public String toString() {
- StringBuffer str = new StringBuffer();
- str.append("HBaseMultiSplit : ");
-
- for (HBaseTableSplit hbt : splits) {
- str.append(" [" + hbt.toString() + "]");
- }
-
- return str.toString();
- }
-
- @Override
- public int compareTo(HBaseMultiInputSplit o) {
- // TODO: Make this comparison better
- return (splits.size() - o.splits.size());
- }
-
- @Override
- public long getLength() throws IOException {
- return splits.size();
- }
-
- @Override
- public String[] getLocations() throws IOException {
- LOG.debug("REGION ARRAY : " + regionLocation);
-
- return new String[] { this.regionLocation };
- }
-
- public void addSplit(HBaseTableSplit hbt) throws IOException {
- if (hbt.getRegionLocation().equals(regionLocation))
- splits.add(hbt);
- else
- throw new IOException("HBaseTableSplit Region Location "
- + hbt.getRegionLocation()
- + " does NOT match MultiSplit Region Location " + regionLocation);
- }
-
- public List<HBaseTableSplit> getSplits() {
- return splits;
- }
-} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
deleted file mode 100644
index 5d7dbdd..0000000
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
+++ /dev/null
@@ -1,609 +0,0 @@
-package parallelai.spyglass.hbase;
-
-import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.Vector;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.StringUtils;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
-public class HBaseRecordReader implements
- RecordReader<ImmutableBytesWritable, Result> {
-
- static final Log LOG = LogFactory.getLog(HBaseRecordReader.class);
-
- private byte[] startRow;
- private byte[] endRow;
- private byte[] lastSuccessfulRow;
- private TreeSet<String> keyList;
- private SourceMode sourceMode;
- private Filter trrRowFilter;
- private ResultScanner scanner;
- private HTable htable;
- private byte[][] trrInputColumns;
- private long timestamp;
- private int rowcount;
- private boolean logScannerActivity = false;
- private int logPerRowCount = 100;
- private boolean endRowInclusive = true;
- private int versions = 1;
- private boolean useSalt = false;
-
- private HBaseMultiInputSplit multiSplit = null;
- private List<HBaseTableSplit> allSplits = null;
-
- private HBaseRecordReader() {
- }
-
- public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException {
- multiSplit = mSplit;
-
- LOG.info("Creatin Multi Split for region location : "
- + multiSplit.getRegionLocation());
-
- allSplits = multiSplit.getSplits();
- }
-
- public boolean setNextSplit() throws IOException {
- if (allSplits.size() > 0) {
- setSplitValue(allSplits.remove(0));
- return true;
- } else {
- return false;
- }
- }
-
- private void setSplitValue(HBaseTableSplit tSplit) throws IOException {
- switch (tSplit.getSourceMode()) {
- case SCAN_ALL:
- case SCAN_RANGE: {
- LOG.debug(String.format(
- "For split [%s] we have start key (%s) and stop key (%s)",
- tSplit, tSplit.getStartRow(), tSplit.getEndRow()));
-
- setStartRow(tSplit.getStartRow());
- setEndRow(tSplit.getEndRow());
- setEndRowInclusive(tSplit.getEndRowInclusive());
- }
-
- break;
-
- case GET_LIST: {
- LOG.debug(String.format("For split [%s] we have key list (%s)",
- tSplit, tSplit.getKeyList()));
-
- setKeyList(tSplit.getKeyList());
- setVersions(tSplit.getVersions());
- }
-
- break;
-
- case EMPTY:
- LOG.info("EMPTY split. Doing nothing.");
- break;
-
- default:
- throw new IOException("Unknown source mode : "
- + tSplit.getSourceMode());
- }
-
- setSourceMode(tSplit.getSourceMode());
-
- init();
- }
-
- /**
- * Restart from survivable exceptions by creating a new scanner.
- *
- * @param firstRow
- * @throws IOException
- */
- private void restartRangeScan(byte[] firstRow) throws IOException {
- Scan currentScan;
- if ((endRow != null) && (endRow.length > 0)) {
- if (trrRowFilter != null) {
- Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
- new byte[] { 0 }) : endRow));
-
- TableInputFormat.addColumns(scan, trrInputColumns);
- scan.setFilter(trrRowFilter);
- scan.setCacheBlocks(false);
- this.scanner = this.htable.getScanner(scan);
- currentScan = scan;
- } else {
- LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
- + ", endRow: " + Bytes.toString(endRow));
- Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
- new byte[] { 0 }) : endRow));
- TableInputFormat.addColumns(scan, trrInputColumns);
- this.scanner = this.htable.getScanner(scan);
- currentScan = scan;
- }
- } else {
- LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
- + ", no endRow");
-
- Scan scan = new Scan(firstRow);
- TableInputFormat.addColumns(scan, trrInputColumns);
- scan.setFilter(trrRowFilter);
- this.scanner = this.htable.getScanner(scan);
- currentScan = scan;
- }
- if (logScannerActivity) {
- LOG.debug("Current scan=" + currentScan.toString());
- timestamp = System.currentTimeMillis();
- rowcount = 0;
- }
- }
-
- public TreeSet<String> getKeyList() {
- return keyList;
- }
-
- private void setKeyList(TreeSet<String> keyList) {
- this.keyList = keyList;
- }
-
- private void setVersions(int versions) {
- this.versions = versions;
- }
-
- public void setUseSalt(boolean useSalt) {
- this.useSalt = useSalt;
- }
-
- public SourceMode getSourceMode() {
- return sourceMode;
- }
-
- private void setSourceMode(SourceMode sourceMode) {
- this.sourceMode = sourceMode;
- }
-
- public byte[] getEndRow() {
- return endRow;
- }
-
- private void setEndRowInclusive(boolean isInclusive) {
- endRowInclusive = isInclusive;
- }
-
- public boolean getEndRowInclusive() {
- return endRowInclusive;
- }
-
- private byte[] nextKey = null;
- private Vector<List<KeyValue>> resultVector = null;
- Map<Long, List<KeyValue>> keyValueMap = null;
-
- /**
- * Build the scanner. Not done in constructor to allow for extension.
- *
- * @throws IOException
- */
- private void init() throws IOException {
- switch (sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- restartRangeScan(startRow);
- break;
-
- case GET_LIST:
- nextKey = Bytes.toBytes(keyList.pollFirst());
- break;
-
- case EMPTY:
- LOG.info("EMPTY mode. Do nothing");
- break;
-
- default:
- throw new IOException(" Unknown source mode : " + sourceMode);
- }
- }
-
- byte[] getStartRow() {
- return this.startRow;
- }
-
- /**
- * @param htable
- * the {@link HTable} to scan.
- */
- public void setHTable(HTable htable) {
- Configuration conf = htable.getConfiguration();
- logScannerActivity = conf.getBoolean(
- ScannerCallable.LOG_SCANNER_ACTIVITY, false);
- logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
- this.htable = htable;
- }
-
- /**
- * @param inputColumns
- * the columns to be placed in {@link Result}.
- */
- public void setInputColumns(final byte[][] inputColumns) {
- this.trrInputColumns = inputColumns;
- }
-
- /**
- * @param startRow
- * the first row in the split
- */
- private void setStartRow(final byte[] startRow) {
- this.startRow = startRow;
- }
-
- /**
- *
- * @param endRow
- * the last row in the split
- */
- private void setEndRow(final byte[] endRow) {
- this.endRow = endRow;
- }
-
- /**
- * @param rowFilter
- * the {@link Filter} to be used.
- */
- public void setRowFilter(Filter rowFilter) {
- this.trrRowFilter = rowFilter;
- }
-
- @Override
- public void close() {
- if (this.scanner != null)
- this.scanner.close();
- }
-
- /**
- * @return ImmutableBytesWritable
- *
- * @see org.apache.hadoop.mapred.RecordReader#createKey()
- */
- @Override
- public ImmutableBytesWritable createKey() {
- return new ImmutableBytesWritable();
- }
-
- /**
- * @return RowResult
- *
- * @see org.apache.hadoop.mapred.RecordReader#createValue()
- */
- @Override
- public Result createValue() {
- return new Result();
- }
-
- @Override
- public long getPos() {
- // This should be the ordinal tuple in the range;
- // not clear how to calculate...
- return 0;
- }
-
- @Override
- public float getProgress() {
- // Depends on the total number of tuples and getPos
- return 0;
- }
-
- /**
- * @param key
- * HStoreKey as input key.
- * @param value
- * MapWritable as input value
- * @return true if there was more data
- * @throws IOException
- */
- @Override
- public boolean next(ImmutableBytesWritable key, Result value)
- throws IOException {
-
- switch (sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE: {
-
- Result result;
- try {
- try {
- result = this.scanner.next();
- if (logScannerActivity) {
- rowcount++;
- if (rowcount >= logPerRowCount) {
- long now = System.currentTimeMillis();
- LOG.debug("Mapper took " + (now - timestamp)
- + "ms to process " + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
- }
- } catch (IOException e) {
- // try to handle all IOExceptions by restarting
- // the scanner, if the second call fails, it will be rethrown
- LOG.debug("recovered from "
- + StringUtils.stringifyException(e));
- if (lastSuccessfulRow == null) {
- LOG.warn("We are restarting the first next() invocation,"
- + " if your mapper has restarted a few other times like this"
- + " then you should consider killing this job and investigate"
- + " why it's taking so long.");
- }
- if (lastSuccessfulRow == null) {
- restartRangeScan(startRow);
- } else {
- restartRangeScan(lastSuccessfulRow);
- this.scanner.next(); // skip presumed already mapped row
- }
- result = this.scanner.next();
- }
-
- if (result != null && result.size() > 0) {
- if (useSalt) {
- key.set(HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
-
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
- }
- return setNextSplit();
- } catch (IOException ioe) {
- if (logScannerActivity) {
- long now = System.currentTimeMillis();
- LOG.debug("Mapper took " + (now - timestamp)
- + "ms to process " + rowcount + " rows");
- LOG.debug(ioe);
- String lastRow = lastSuccessfulRow == null ? "null" : Bytes
- .toStringBinary(lastSuccessfulRow);
- LOG.debug("lastSuccessfulRow=" + lastRow);
- }
- throw ioe;
- }
- }
-
- case GET_LIST: {
- LOG.debug(String.format("INTO next with GET LIST and Key (%s)",
- Bytes.toString(nextKey)));
-
- if (versions == 1) {
- if (nextKey != null) {
- LOG.debug(String.format("Processing Key (%s)",
- Bytes.toString(nextKey)));
-
- Get theGet = new Get(nextKey);
- theGet.setMaxVersions(versions);
-
- Result result = this.htable.get(theGet);
-
- if (result != null && (!result.isEmpty())) {
- LOG.debug(String.format(
- "Key (%s), Version (%s), Got Result (%s)",
- Bytes.toString(nextKey), versions, result));
-
- if (keyList != null || !keyList.isEmpty()) {
- String newKey = keyList.pollFirst();
- LOG.debug("New Key => " + newKey);
- nextKey = (newKey == null || newKey.length() == 0) ? null
- : Bytes.toBytes(newKey);
- } else {
- nextKey = null;
- }
-
- LOG.debug(String.format("=> Picked a new Key (%s)",
- Bytes.toString(nextKey)));
-
- // Write the result
- if (useSalt) {
- key.set(HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
-
- return true;
- } else {
- LOG.debug(" Key (" + Bytes.toString(nextKey)
- + ") return an EMPTY result. Get (" + theGet.getId()
- + ")"); // alg0
-
- String newKey;
- while ((newKey = keyList.pollFirst()) != null) {
- LOG.debug("WHILE NEXT Key => " + newKey);
-
- nextKey = (newKey == null || newKey.length() == 0) ? null
- : Bytes.toBytes(newKey);
-
- if (nextKey == null) {
- LOG.error("BOMB! BOMB! BOMB!");
- continue;
- }
-
- if (!this.htable.exists(new Get(nextKey))) {
- LOG.debug(String.format(
- "Key (%s) Does not exist in Table (%s)",
- Bytes.toString(nextKey),
- Bytes.toString(this.htable.getTableName())));
- continue;
- } else {
- break;
- }
- }
-
- nextKey = (newKey == null || newKey.length() == 0) ? null
- : Bytes.toBytes(newKey);
-
- LOG.debug("Final New Key => " + Bytes.toString(nextKey));
-
- return next(key, value);
- }
- } else {
- // Nothig left. return false
- return setNextSplit();
- }
- } else {
- if (resultVector != null && resultVector.size() != 0) {
- LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>",
- versions, resultVector));
-
- List<KeyValue> resultKeyValue = resultVector
- .remove(resultVector.size() - 1);
- Result result = new Result(resultKeyValue);
-
- LOG.debug(String.format("+ Version (%s), Got Result <%s>",
- versions, result));
-
- if (useSalt) {
- key.set(HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
-
- return true;
- } else {
- if (nextKey != null) {
- LOG.debug(String.format("+ Processing Key (%s)",
- Bytes.toString(nextKey)));
-
- Get theGet = new Get(nextKey);
- theGet.setMaxVersions(versions);
-
- Result resultAll = this.htable.get(theGet);
-
- if (resultAll != null && (!resultAll.isEmpty())) {
- List<KeyValue> keyValeList = resultAll.list();
-
- keyValueMap = new HashMap<Long, List<KeyValue>>();
-
- LOG.debug(String.format(
- "+ Key (%s) Versions (%s) Val;ute map <%s>",
- Bytes.toString(nextKey), versions, keyValueMap));
-
- for (KeyValue keyValue : keyValeList) {
- long version = keyValue.getTimestamp();
-
- if (keyValueMap.containsKey(new Long(version))) {
- List<KeyValue> keyValueTempList = keyValueMap
- .get(new Long(version));
- if (keyValueTempList == null) {
- keyValueTempList = new ArrayList<KeyValue>();
- }
- keyValueTempList.add(keyValue);
- } else {
- List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
- keyValueMap.put(new Long(version),
- keyValueTempList);
- keyValueTempList.add(keyValue);
- }
- }
-
- resultVector = new Vector<List<KeyValue>>();
- resultVector.addAll(keyValueMap.values());
-
- List<KeyValue> resultKeyValue = resultVector
- .remove(resultVector.size() - 1);
-
- Result result = new Result(resultKeyValue);
-
- LOG.debug(String.format(
- "+ Version (%s), Got Result (%s)", versions,
- result));
-
- String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
-
- System.out.println("+ New Key => " + newKey);
- nextKey = (newKey == null || newKey.length() == 0) ? null
- : Bytes.toBytes(newKey);
-
- if (useSalt) {
- key.set(HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
- } else {
- LOG.debug(String.format(
- "+ Key (%s) return an EMPTY result. Get (%s)",
- Bytes.toString(nextKey), theGet.getId())); // alg0
-
- String newKey;
-
- while ((newKey = keyList.pollFirst()) != null) {
- LOG.debug("+ WHILE NEXT Key => " + newKey);
-
- nextKey = (newKey == null || newKey.length() == 0) ? null
- : Bytes.toBytes(newKey);
-
- if (nextKey == null) {
- LOG.error("+ BOMB! BOMB! BOMB!");
- continue;
- }
-
- if (!this.htable.exists(new Get(nextKey))) {
- LOG.debug(String.format(
- "+ Key (%s) Does not exist in Table (%s)",
- Bytes.toString(nextKey),
- Bytes.toString(this.htable.getTableName())));
- continue;
- } else {
- break;
- }
- }
-
- nextKey = (newKey == null || newKey.length() == 0) ? null
- : Bytes.toBytes(newKey);
-
- LOG.debug("+ Final New Key => "
- + Bytes.toString(nextKey));
-
- return next(key, value);
- }
-
- } else {
- return setNextSplit();
- }
- }
- }
- }
-
- case EMPTY: {
- LOG.info("GOT an empty Split");
- return setNextSplit();
- }
-
- default:
- throw new IOException("Unknown source mode : " + sourceMode);
- }
- }
-}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
new file mode 100644
index 0000000..37858ad
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
@@ -0,0 +1,140 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.util.TreeSet;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chand_000
+ * Date: 29/08/13
+ * Time: 15:42
+ * To change this template use File | Settings | File Templates.
+ */
+public abstract class HBaseRecordReaderBase implements
+ RecordReader<ImmutableBytesWritable, Result> {
+
+ protected TreeSet<String> keyList;
+ protected HBaseConstants.SourceMode sourceMode;
+ protected boolean endRowInclusive = true;
+ protected int versions = 1;
+ protected boolean useSalt = false;
+
+ protected byte[] startRow;
+ protected byte[] endRow;
+
+ protected HTable htable;
+ protected byte[][] trrInputColumns;
+
+ protected Filter trrRowFilter;
+
+ protected boolean logScannerActivity = false;
+ protected int logPerRowCount = 100;
+
+ @Override
+ public String toString() {
+ StringBuffer sbuf = new StringBuffer();
+
+ sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] endRowInc [%s] ",
+ Bytes.toString(startRow), Bytes.toString(endRow), endRowInclusive));
+ sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ",
+ sourceMode, useSalt, versions));
+
+ return sbuf.toString();
+ }
+
+ byte[] getStartRow() {
+ return this.startRow;
+ }
+
+ /**
+ * @param htable
+ * the {@link org.apache.hadoop.hbase.client.HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
+ false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns
+ * the columns to be placed in {@link Result}.
+ */
+ public void setInputColumns(final byte[][] inputColumns) {
+ this.trrInputColumns = inputColumns;
+ }
+
+ /**
+ * @param startRow
+ * the first row in the split
+ */
+ public void setStartRow(final byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ /**
+ *
+ * @param endRow
+ * the last row in the split
+ */
+ public void setEndRow(final byte[] endRow) {
+ this.endRow = endRow;
+ }
+
+ /**
+ * @param rowFilter
+ * the {@link org.apache.hadoop.hbase.filter.Filter} to be used.
+ */
+ public void setRowFilter(Filter rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ public TreeSet<String> getKeyList() {
+ return keyList;
+ }
+
+ public void setKeyList(TreeSet<String> keyList) {
+ this.keyList = keyList;
+ }
+
+ public void setVersions(int versions) {
+ this.versions = versions;
+ }
+
+ public void setUseSalt(boolean useSalt) {
+ this.useSalt = useSalt;
+ }
+
+ public HBaseConstants.SourceMode getSourceMode() {
+ return sourceMode;
+ }
+
+ public void setSourceMode(HBaseConstants.SourceMode sourceMode) {
+ this.sourceMode = sourceMode;
+ }
+
+ public byte[] getEndRow() {
+ return endRow;
+ }
+
+ public void setEndRowInclusive(boolean isInclusive) {
+ endRowInclusive = isInclusive;
+ }
+
+ public boolean getEndRowInclusive() {
+ return endRowInclusive;
+ }
+
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index 5eafc78..6c28d9f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -30,29 +30,29 @@ import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-public class HBaseRecordReader_SINGLE implements
- RecordReader<ImmutableBytesWritable, Result> {
+public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
- static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class);
+ static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class);
- private byte[] startRow;
- private byte[] endRow;
private byte[] lastSuccessfulRow;
- private TreeSet<String> keyList;
- private SourceMode sourceMode;
- private Filter trrRowFilter;
private ResultScanner scanner;
- private HTable htable;
- private byte[][] trrInputColumns;
private long timestamp;
private int rowcount;
- private boolean logScannerActivity = false;
- private int logPerRowCount = 100;
- private boolean endRowInclusive = true;
- private int versions = 1;
- private boolean useSalt = false;
- /**
+ @Override
+ public String toString() {
+ StringBuffer sbuf = new StringBuffer();
+
+ sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]",
+ Bytes.toString(startRow), Bytes.toString(endRow), Bytes.toString(lastSuccessfulRow), Bytes.toString(nextKey), endRowInclusive, rowcount));
+ sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ",
+ sourceMode, useSalt, versions));
+
+ return sbuf.toString();
+ }
+
+
+ /**
* Restart from survivable exceptions by creating a new scanner.
*
* @param firstRow
@@ -96,41 +96,6 @@ public class HBaseRecordReader_SINGLE implements
}
}
- public TreeSet<String> getKeyList() {
- return keyList;
- }
-
- public void setKeyList(TreeSet<String> keyList) {
- this.keyList = keyList;
- }
-
- public void setVersions(int versions) {
- this.versions = versions;
- }
-
- public void setUseSalt(boolean useSalt) {
- this.useSalt = useSalt;
- }
-
- public SourceMode getSourceMode() {
- return sourceMode;
- }
-
- public void setSourceMode(SourceMode sourceMode) {
- this.sourceMode = sourceMode;
- }
-
- public byte[] getEndRow() {
- return endRow;
- }
-
- public void setEndRowInclusive(boolean isInclusive) {
- endRowInclusive = isInclusive;
- }
-
- public boolean getEndRowInclusive() {
- return endRowInclusive;
- }
private byte[] nextKey = null;
private Vector<List<KeyValue>> resultVector = null;
@@ -157,55 +122,6 @@ public class HBaseRecordReader_SINGLE implements
}
}
- byte[] getStartRow() {
- return this.startRow;
- }
-
- /**
- * @param htable
- * the {@link HTable} to scan.
- */
- public void setHTable(HTable htable) {
- Configuration conf = htable.getConfiguration();
- logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
- false);
- logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
- this.htable = htable;
- }
-
- /**
- * @param inputColumns
- * the columns to be placed in {@link Result}.
- */
- public void setInputColumns(final byte[][] inputColumns) {
- this.trrInputColumns = inputColumns;
- }
-
- /**
- * @param startRow
- * the first row in the split
- */
- public void setStartRow(final byte[] startRow) {
- this.startRow = startRow;
- }
-
- /**
- *
- * @param endRow
- * the last row in the split
- */
- public void setEndRow(final byte[] endRow) {
- this.endRow = endRow;
- }
-
- /**
- * @param rowFilter
- * the {@link Filter} to be used.
- */
- public void setRowFilter(Filter rowFilter) {
- this.trrRowFilter = rowFilter;
- }
-
@Override
public void close() {
if (this.scanner != null)
@@ -450,7 +366,6 @@ public class HBaseRecordReader_SINGLE implements
String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
- System.out.println("+ New Key => " + newKey);
nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
.toBytes(newKey);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
new file mode 100644
index 0000000..e2b1ec8
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
@@ -0,0 +1,124 @@
+package parallelai.spyglass.hbase;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
+
+ static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class);
+
+
+ private byte[] nextKey = null;
+ private Vector<List<KeyValue>> resultVector = null;
+ Map<Long, List<KeyValue>> keyValueMap = null;
+
+ private HBaseTableSplitRegional multiSplit = null;
+ private HBaseTableSplitGranular currentSplit = null;
+
+ private HBaseRecordReaderGranular currentRecordReader = null;
+
+ public void init(HBaseTableSplitRegional mSplit) throws IOException {
+ multiSplit = mSplit;
+
+ LOG.debug("Creating Multi Split for region location : "
+ + multiSplit.getRegionLocation() + " -> " + multiSplit);
+
+ setNextSplit();
+ }
+
+ public boolean setNextSplit() throws IOException {
+ currentSplit = multiSplit.getNextSplit();
+
+ LOG.debug("IN: setNextSplit : " + currentSplit );
+
+ if( currentSplit != null ) {
+ setSplitValue(currentSplit);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException {
+ HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
+
+ trr.setHTable(htable);
+ trr.setInputColumns(trrInputColumns);
+ trr.setRowFilter(trrRowFilter);
+
+ trr.init();
+ }
+
+ private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException {
+ LOG.debug("IN: setSplitValue : " + tSplit );
+
+ if( currentRecordReader != null ) currentRecordReader.close();
+
+ currentRecordReader = new HBaseRecordReaderGranular();
+ setRecordReaderParms(currentRecordReader, currentSplit);
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException {
+ boolean nextFlag = currentRecordReader.next(ibw, result);
+
+ while(nextFlag == false && multiSplit.hasMoreSplits() ) {
+ setNextSplit();
+ nextFlag = currentRecordReader.next(ibw, result);
+ }
+
+ return nextFlag;
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return currentRecordReader.createKey();
+ }
+
+ @Override
+ public Result createValue() {
+ return currentRecordReader.createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return currentRecordReader.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return currentRecordReader.getProgress();
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
index 6766458..5bdf8cd 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
@@ -68,7 +68,7 @@ public class HBaseSalter {
byte[] originalStartKey, byte[] originalStopKey,
byte[] regionStartKey, byte[] regionStopKey,
String prefixList) throws IOException {
- LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)",
+ LOG.debug("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)",
Bytes.toString(originalStartKey),
Bytes.toString(originalStopKey),
Bytes.toString(regionStartKey),
@@ -160,7 +160,7 @@ public class HBaseSalter {
}
private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) {
- LOG.info("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)",
+ LOG.debug("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)",
Bytes.toString(originalKey), prefixList, startPrefix, stopPrefix));
char[] prefixArray = prefixList.toCharArray();
@@ -172,13 +172,13 @@ public class HBaseSalter {
SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true);
- LOG.info("".format("Prefix subset (%s)", subSet));
+ LOG.debug("".format("Prefix subset (%s)", subSet));
return getAllKeys(originalKey, subSet.toArray(new Byte[]{}));
}
public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) {
- LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)",
+ LOG.debug("".format("getAllKeys: OKEY (%s) PARRAY (%s)",
Bytes.toString(originalKey), prefixArray ));
byte[][] keys = new byte[prefixArray.length][];
@@ -187,12 +187,6 @@ public class HBaseSalter {
keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey));
}
- for(int i = 0; i < keys.length; i ++) {
- for(int j = 0; j < keys[i].length; j++) {
- LOG.info("" + i + " : " + j + " : " + keys[i][j]);
- }
- }
-
return keys;
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index aa446c1..6f04f01 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -12,6 +12,7 @@
package parallelai.spyglass.hbase;
+import parallelai.spyglass.hbase.HBaseConstants.SplitType;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
@@ -65,6 +66,8 @@ public class HBaseScheme
// private transient byte[][] fields;
private boolean useSalt = false;
+
+ private SplitType splitType = SplitType.GRANULAR;
/**
@@ -279,11 +282,31 @@ public class HBaseScheme
@Override
public void sourceConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
- conf.setInputFormat(HBaseInputFormat.class);
- String columns = getColumns();
- LOG.debug("sourcing from columns: {}", columns);
- conf.set(HBaseInputFormat.COLUMN_LIST, columns);
+ switch(splitType) {
+ case GRANULAR:
+ {
+ conf.setInputFormat(HBaseInputFormatGranular.class);
+
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
+ conf.set(HBaseInputFormatGranular.COLUMN_LIST, columns);
+ }
+ break;
+
+ case REGIONAL:
+ {
+ conf.setInputFormat(HBaseInputFormatRegional.class);
+
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
+ conf.set(HBaseInputFormatRegional.COLUMN_LIST, columns);
+ }
+ break;
+
+ default:
+ LOG.error("Unknown Split Type : " + splitType.toString());
+ }
}
private String getColumns() {
@@ -345,4 +368,8 @@ public class HBaseScheme
result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0);
return result;
}
+
+ public void setInputSplitTye(SplitType sType) {
+ this.splitType = sType;
+ }
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
deleted file mode 100644
index 87b8f58..0000000
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
+++ /dev/null
@@ -1,219 +0,0 @@
-package parallelai.spyglass.hbase;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.InputSplit;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
-public class HBaseTableSplit implements InputSplit,
- Comparable<HBaseTableSplit>, Serializable {
-
- private final Log LOG = LogFactory.getLog(HBaseTableSplit.class);
-
- private byte[] m_tableName = null;
- private byte[] m_startRow = null;
- private byte[] m_endRow = null;
- private String m_regionLocation = null;
- private TreeSet<String> m_keyList = null;
- private SourceMode m_sourceMode = SourceMode.EMPTY;
- private boolean m_endRowInclusive = true;
- private int m_versions = 1;
- private boolean m_useSalt = false;
-
- /** default constructor */
- public HBaseTableSplit() {
- this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);
- }
-
- /**
- * Constructor
- *
- * @param tableName
- * @param startRow
- * @param endRow
- * @param location
- */
- public HBaseTableSplit(final byte[] tableName, final byte[] startRow,
- final byte[] endRow, final String location,
- final SourceMode sourceMode, final boolean useSalt) {
- this.m_tableName = tableName;
- this.m_startRow = startRow;
- this.m_endRow = endRow;
- this.m_regionLocation = location;
- this.m_sourceMode = sourceMode;
- this.m_useSalt = useSalt;
- }
-
- public HBaseTableSplit(final byte[] tableName,
- final TreeSet<String> keyList, int versions, final String location,
- final SourceMode sourceMode, final boolean useSalt) {
- this.m_tableName = tableName;
- this.m_keyList = keyList;
- this.m_versions = versions;
- this.m_sourceMode = sourceMode;
- this.m_regionLocation = location;
- this.m_useSalt = useSalt;
- }
-
- /** @return table name */
- public byte[] getTableName() {
- return this.m_tableName;
- }
-
- /** @return starting row key */
- public byte[] getStartRow() {
- return this.m_startRow;
- }
-
- /** @return end row key */
- public byte[] getEndRow() {
- return this.m_endRow;
- }
-
- public boolean getEndRowInclusive() {
- return m_endRowInclusive;
- }
-
- public void setEndRowInclusive(boolean isInclusive) {
- m_endRowInclusive = isInclusive;
- }
-
- /** @return list of keys to get */
- public TreeSet<String> getKeyList() {
- return m_keyList;
- }
-
- public int getVersions() {
- return m_versions;
- }
-
- /** @return get the source mode */
- public SourceMode getSourceMode() {
- return m_sourceMode;
- }
-
- public boolean getUseSalt() {
- return m_useSalt;
- }
-
- /** @return the region's hostname */
- public String getRegionLocation() {
- LOG.debug("REGION GETTER : " + m_regionLocation);
-
- return this.m_regionLocation;
- }
-
- public String[] getLocations() {
- LOG.debug("REGION ARRAY : " + m_regionLocation);
-
- return new String[] { this.m_regionLocation };
- }
-
- @Override
- public long getLength() {
- // Not clear how to obtain this... seems to be used only for sorting
- // splits
- return 0;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- LOG.debug("READ ME : " + in.toString());
-
- this.m_tableName = Bytes.readByteArray(in);
- this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
- this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes
- .readByteArray(in)));
- this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
-
- switch (this.m_sourceMode) {
- case SCAN_RANGE:
- this.m_startRow = Bytes.readByteArray(in);
- this.m_endRow = Bytes.readByteArray(in);
- this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
- break;
-
- case GET_LIST:
- this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
- this.m_keyList = new TreeSet<String>();
-
- int m = Bytes.toInt(Bytes.readByteArray(in));
-
- for (int i = 0; i < m; i++) {
- this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
- }
- break;
- }
-
- LOG.debug("READ and CREATED : " + this);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- LOG.debug("WRITE : " + this);
-
- Bytes.writeByteArray(out, this.m_tableName);
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
-
- switch (this.m_sourceMode) {
- case SCAN_RANGE:
- Bytes.writeByteArray(out, this.m_startRow);
- Bytes.writeByteArray(out, this.m_endRow);
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
- break;
-
- case GET_LIST:
- Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
-
- for (String k : this.m_keyList) {
- Bytes.writeByteArray(out, Bytes.toBytes(k));
- }
- break;
- }
-
- LOG.debug("WROTE : " + out.toString());
- }
-
- @Override
- public String toString() {
- return String
- .format(
- "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
- Bytes.toString(m_tableName), m_regionLocation, m_sourceMode,
- Bytes.toString(m_startRow), Bytes.toString(m_endRow),
- (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions,
- m_useSalt);
- }
-
- @Override
- public int compareTo(HBaseTableSplit o) {
- switch (m_sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- return Bytes.compareTo(getStartRow(), o.getStartRow());
-
- case GET_LIST:
- return m_keyList.equals(o.getKeyList()) ? 0 : -1;
-
- case EMPTY:
- return 0;
-
- default:
- return -1;
- }
-
- }
-} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
new file mode 100644
index 0000000..2f6e7b5
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
@@ -0,0 +1,165 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.TreeSet;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chand_000
+ * Date: 29/08/13
+ * Time: 16:18
+ * To change this template use File | Settings | File Templates.
+ */
+public abstract class HBaseTableSplitBase implements InputSplit,
+ Comparable<HBaseTableSplitBase>, Serializable {
+
+ private final Log LOG = LogFactory.getLog(HBaseTableSplitBase.class);
+
+
+ protected byte[] m_tableName = null;
+ protected byte[] m_startRow = null;
+ protected byte[] m_endRow = null;
+ protected String m_regionLocation = null;
+ protected TreeSet<String> m_keyList = null;
+ protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY;
+ protected boolean m_endRowInclusive = true;
+ protected int m_versions = 1;
+ protected boolean m_useSalt = false;
+
+
+ /** @return table name */
+ public byte[] getTableName() {
+ return this.m_tableName;
+ }
+
+ /** @return starting row key */
+ public byte[] getStartRow() {
+ return this.m_startRow;
+ }
+
+ /** @return end row key */
+ public byte[] getEndRow() {
+ return this.m_endRow;
+ }
+
+ public boolean getEndRowInclusive() {
+ return m_endRowInclusive;
+ }
+
+ public void setEndRowInclusive(boolean isInclusive) {
+ m_endRowInclusive = isInclusive;
+ }
+
+ /** @return list of keys to get */
+ public TreeSet<String> getKeyList() {
+ return m_keyList;
+ }
+
+ public int getVersions() {
+ return m_versions;
+ }
+
+ /** @return get the source mode */
+ public HBaseConstants.SourceMode getSourceMode() {
+ return m_sourceMode;
+ }
+
+ public boolean getUseSalt() {
+ return m_useSalt;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ LOG.debug("REGION GETTER : " + m_regionLocation);
+
+ return this.m_regionLocation;
+ }
+
+ public String[] getLocations() {
+ LOG.debug("REGION ARRAY : " + m_regionLocation);
+
+ return new String[] { this.m_regionLocation };
+ }
+
+
+ public void copy(HBaseTableSplitBase that) {
+ this.m_endRow = that.m_endRow;
+ this.m_endRowInclusive = that.m_endRowInclusive;
+ this.m_keyList = that.m_keyList;
+ this.m_sourceMode = that.m_sourceMode;
+ this.m_startRow = that.m_startRow;
+ this.m_tableName = that.m_tableName;
+ this.m_useSalt = that.m_useSalt;
+ this.m_versions = that.m_versions;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ LOG.debug("READ ME : " + in.toString());
+
+ this.m_tableName = Bytes.readByteArray(in);
+ this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes
+ .readByteArray(in)));
+ this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
+
+ switch (this.m_sourceMode) {
+ case SCAN_RANGE:
+ this.m_startRow = Bytes.readByteArray(in);
+ this.m_endRow = Bytes.readByteArray(in);
+ this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
+ break;
+
+ case GET_LIST:
+ this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
+ this.m_keyList = new TreeSet<String>();
+
+ int m = Bytes.toInt(Bytes.readByteArray(in));
+
+ for (int i = 0; i < m; i++) {
+ this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
+ }
+ break;
+ }
+
+ LOG.debug("READ and CREATED : " + this);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ LOG.debug("WRITE : " + this);
+
+ Bytes.writeByteArray(out, this.m_tableName);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
+
+ switch (this.m_sourceMode) {
+ case SCAN_RANGE:
+ Bytes.writeByteArray(out, this.m_startRow);
+ Bytes.writeByteArray(out, this.m_endRow);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
+ break;
+
+ case GET_LIST:
+ Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
+
+ for (String k : this.m_keyList) {
+ Bytes.writeByteArray(out, Bytes.toBytes(k));
+ }
+ break;
+ }
+
+ LOG.debug("WROTE : " + out.toString());
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
new file mode 100644
index 0000000..4de7153
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
@@ -0,0 +1,97 @@
+package parallelai.spyglass.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseTableSplitGranular extends HBaseTableSplitBase {
+
+ private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class);
+
+ /** default constructor */
+ public HBaseTableSplitGranular() {
+ this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ * @param location
+ */
+ public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow,
+ final byte[] endRow, final String location,
+ final HBaseConstants.SourceMode sourceMode, final boolean useSalt) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_regionLocation = location;
+ this.m_sourceMode = sourceMode;
+ this.m_useSalt = useSalt;
+ }
+
+ public HBaseTableSplitGranular(final byte[] tableName,
+ final TreeSet<String> keyList, int versions, final String location,
+ final HBaseConstants.SourceMode sourceMode, final boolean useSalt) {
+ this.m_tableName = tableName;
+ this.m_keyList = keyList;
+ this.m_versions = versions;
+ this.m_sourceMode = sourceMode;
+ this.m_regionLocation = location;
+ this.m_useSalt = useSalt;
+ }
+
+
+ @Override
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting
+ // splits
+ return 0;
+ }
+
+
+ @Override
+ public String toString() {
+ return String
+ .format(
+ "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_sourceMode,
+ Bytes.toString(m_startRow), Bytes.toString(m_endRow),
+ (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions,
+ m_useSalt);
+ }
+
+ @Override
+ public int compareTo(HBaseTableSplitBase o) {
+ if( ! (o instanceof HBaseTableSplitGranular) ) return -1;
+
+ switch (m_sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ return Bytes.compareTo(getStartRow(), o.getStartRow());
+
+ case GET_LIST:
+ return m_keyList.equals(o.getKeyList()) ? 0 : -1;
+
+ case EMPTY:
+ return 0;
+
+ default:
+ return -1;
+ }
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
new file mode 100644
index 0000000..1ebfa3d
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
@@ -0,0 +1,127 @@
+package parallelai.spyglass.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+public class HBaseTableSplitRegional extends HBaseTableSplitBase {
+
+ private final Log LOG = LogFactory.getLog(HBaseTableSplitRegional.class);
+
+ private List<HBaseTableSplitGranular> splits = new Vector<HBaseTableSplitGranular>();
+
+ /** default constructor */
+ private HBaseTableSplitRegional() {
+
+ }
+
+ public HBaseTableSplitRegional(String regionLocation) {
+ this.m_regionLocation = regionLocation;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ LOG.debug("REGIONAL READ ME : " + in.toString());
+
+ super.readFields(in);
+
+ int s = Bytes.toInt(Bytes.readByteArray(in));
+
+ for (int i = 0; i < s; i++) {
+ HBaseTableSplitGranular hbts = new HBaseTableSplitGranular();
+ hbts.readFields(in);
+
+ splits.add(hbts);
+ }
+
+ LOG.debug("REGIONAL READ and CREATED : " + this);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ LOG.debug("REGIONAL WRITE : " + this);
+
+ super.write(out);
+
+ Bytes.writeByteArray(out, Bytes.toBytes(splits.size()));
+
+ for (HBaseTableSplitGranular hbts : splits) {
+ hbts.write(out);
+ }
+
+ LOG.debug("REGIONAL WROTE : " + out.toString());
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer str = new StringBuffer();
+ str.append("HBaseTableSplitRegional : ");
+
+ str.append(super.toString());
+
+ str.append(" GRANULAR = > ");
+
+ for (HBaseTableSplitGranular hbt : splits) {
+ str.append(" [" + hbt.toString() + "]");
+ }
+
+ return str.toString();
+ }
+
+ @Override
+ public int compareTo(HBaseTableSplitBase o) {
+ if( ! (o instanceof HBaseTableSplitRegional) ) return -1;
+
+ return (splits.size() - ((HBaseTableSplitRegional)o).splits.size());
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return splits.size();
+ }
+
+ public void addSplit(HBaseTableSplitGranular hbt) throws IOException {
+ LOG.debug("ADD Split : " + hbt);
+
+ if (hbt.getRegionLocation().equals(m_regionLocation)) {
+ splits.add(hbt);
+ this.copy(hbt);
+ } else
+ throw new IOException("HBaseTableSplitGranular Region Location "
+ + hbt.getRegionLocation()
+ + " does NOT match MultiSplit Region Location " + m_regionLocation);
+ }
+
+// public List<HBaseTableSplitGranular> getSplits() {
+// return splits;
+// }
+
+ public boolean hasMoreSplits() {
+ splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator;
+
+ return splitIterator.hasNext();
+ }
+
+ private Iterator<HBaseTableSplitGranular> splitIterator = null;
+
+ public HBaseTableSplitGranular getNextSplit() {
+ splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator;
+
+ if( splitIterator.hasNext() ) {
+ return splitIterator.next();
+ } else {
+ return null;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 07b5aa7..bfe6670 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,6 +12,8 @@
package parallelai.spyglass.hbase;
+import parallelai.spyglass.hbase.HBaseConstants.SplitType;
+
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
@@ -63,6 +65,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
/** Field tableName */
private String tableName;
+ private SplitType splitType = SplitType.GRANULAR;
+
/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
@@ -204,7 +208,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
return true;
}
- LOG.info("creating hbase table: {}", tableName);
+ LOG.info("Creating HBase Table: {}", tableName);
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
@@ -256,8 +260,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
// process.getID();
//
// super.getFullIdentifier(conf);
-
- HBaseInputFormat.setTableName(conf, tableName);
+
+ switch(splitType) {
+ case GRANULAR:
+ HBaseInputFormatGranular.setTableName(conf, tableName);
+ break;
+
+ case REGIONAL:
+ HBaseInputFormatRegional.setTableName(conf, tableName);
+ break;
+
+ default:
+ LOG.error("Unknown Split Type : " + splitType);
+ }
for( SourceConfig sc : sourceConfigList) {
sc.configure(conf);
@@ -266,6 +281,10 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
super.sourceConfInit(process, conf);
}
+ public void setInputSplitType(SplitType sType) {
+ this.splitType = sType;
+ }
+
@Override
public boolean equals(Object object) {
if (this == object) {