aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java622
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java505
3 files changed, 1128 insertions, 1 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
index 8e121bc..5a1184f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
@@ -90,7 +90,7 @@ public class HBaseInputFormat implements
@SuppressWarnings("deprecation")
@Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
if (this.table == null) {
throw new IOException("No table was provided");
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java
new file mode 100644
index 0000000..96bfea1
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java
@@ -0,0 +1,622 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import javax.naming.NamingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseInputFormat_SINGLE implements
+ InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
+
+ private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ private byte[][] inputColumns;
+ private HTable table;
+ // private HBaseRecordReader tableRecordReader;
+ private Filter rowFilter;
+ // private String tableName = "";
+
+ private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();
+
+ private String nameServer = null;
+
+ // private Scan scan = null;
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ if (this.table == null) {
+ throw new IOException("No table was provided");
+ }
+
+ if (this.inputColumns == null || this.inputColumns.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+
+ Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+
+ if (keys == null || keys.getFirst() == null
+ || keys.getFirst().length == 0) {
+ HRegionLocation regLoc = table.getRegionLocation(
+ HConstants.EMPTY_BYTE_ARRAY, false);
+
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
+ HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+ .getHostnamePort().split(
+ Addressing.HOSTNAME_PORT_SEPARATOR)[0],
+ SourceMode.EMPTY, false);
+
+ splits.add(split);
+
+ return splits.toArray(new HBaseTableSplit[splits.size()]);
+ }
+
+ if (keys.getSecond() == null || keys.getSecond().length == 0) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ if (keys.getFirst().length != keys.getSecond().length) {
+ throw new IOException("Regions for start and end key do not match");
+ }
+
+ byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
+ byte[] maxKey = keys.getSecond()[0];
+
+ LOG.debug(String.format("SETTING min key (%s) and max key (%s)",
+ Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+ byte[][] regStartKeys = keys.getFirst();
+ byte[][] regStopKeys = keys.getSecond();
+ String[] regions = new String[regStartKeys.length];
+
+ for (int i = 0; i < regStartKeys.length; i++) {
+ minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0)
+ && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i]
+ : minKey;
+ maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0)
+ && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i]
+ : maxKey;
+
+ HServerAddress regionServerAddress = table.getRegionLocation(
+ keys.getFirst()[i]).getServerAddress();
+ InetAddress regionAddress = regionServerAddress.getInetSocketAddress()
+ .getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress
+ + " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ // HServerAddress regionServerAddress =
+ // table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+ // InetAddress regionAddress =
+ // regionServerAddress.getInetSocketAddress().getAddress();
+ //
+ // String regionLocation;
+ //
+ // try {
+ // regionLocation = reverseDNS(regionAddress);
+ // } catch (NamingException e) {
+ // LOG.error("Cannot resolve the host name for " + regionAddress +
+ // " because of " + e);
+ // regionLocation = regionServerAddress.getHostname();
+ // }
+
+ // String regionLocation =
+ // table.getRegionLocation(keys.getFirst()[i]).getHostname();
+
+ LOG.debug("***** " + regionLocation);
+
+ if (regionLocation == null || regionLocation.length() == 0)
+ throw new IOException("The region info for regiosn " + i
+ + " is null or empty");
+
+ regions[i] = regionLocation;
+
+ LOG.debug(String.format(
+ "Region (%s) has start key (%s) and stop key (%s)", regions[i],
+ Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
+ }
+
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+ LOG.debug(String.format("Found min key (%s) and max key (%s)",
+ Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+ LOG.debug("SOURCE MODE is : " + sourceMode);
+
+ switch (sourceMode) {
+ case SCAN_ALL:
+ startRow = HConstants.EMPTY_START_ROW;
+ stopRow = HConstants.EMPTY_END_ROW;
+
+ LOG.info(String.format(
+ "SCAN ALL: Found start key (%s) and stop key (%s)",
+ Bytes.toString(startRow), Bytes.toString(stopRow)));
+ break;
+
+ case SCAN_RANGE:
+ startRow = (startKey != null && startKey.length() != 0) ? Bytes
+ .toBytes(startKey) : HConstants.EMPTY_START_ROW;
+ stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes
+ .toBytes(stopKey) : HConstants.EMPTY_END_ROW;
+
+ LOG.info(String.format(
+ "SCAN RANGE: Found start key (%s) and stop key (%s)",
+ Bytes.toString(startRow), Bytes.toString(stopRow)));
+ break;
+ }
+
+ switch (sourceMode) {
+ case EMPTY:
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+ // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey :
+ // startRow;
+ // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey :
+ // stopRow;
+
+ List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ if (!useSalt) {
+
+ List<HRegionLocation> validRegions = table.getRegionsInRange(
+ startRow, stopRow);
+
+ int maxRegions = validRegions.size();
+ int currentRegion = 1;
+
+ for (HRegionLocation cRegion : validRegions) {
+ byte[] rStart = cRegion.getRegionInfo().getStartKey();
+ byte[] rStop = cRegion.getRegionInfo().getEndKey();
+
+ HServerAddress regionServerAddress = cRegion
+ .getServerAddress();
+ InetAddress regionAddress = regionServerAddress
+ .getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for "
+ + regionAddress + " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
+ || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart
+ : startRow);
+ byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW
+ || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop
+ : stopRow);
+
+ LOG.debug(String.format(
+ "BOOL start (%s) stop (%s) length (%d)",
+ (startRow == HConstants.EMPTY_START_ROW || (Bytes
+ .compareTo(startRow, rStart) <= 0)),
+ (stopRow == HConstants.EMPTY_END_ROW || (Bytes
+ .compareTo(stopRow, rStop) >= 0)), rStop.length));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), sStart, sStop, regionLocation,
+ SourceMode.SCAN_RANGE, useSalt);
+
+ split.setEndRowInclusive(currentRegion == maxRegions);
+
+ currentRegion++;
+
+ LOG.debug(String
+ .format(
+ "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
+ Bytes.toString(startRow),
+ Bytes.toString(stopRow), Bytes.toString(rStart),
+ Bytes.toString(rStop), Bytes.toString(sStart),
+ Bytes.toString(sStop), cRegion.getHostnamePort(),
+ split));
+
+ splits.add(split);
+ }
+ } else {
+ LOG.debug("Using SALT : " + useSalt);
+
+ // Will return the start and the stop key with all possible
+ // prefixes.
+ for (int i = 0; i < regions.length; i++) {
+ Pair<byte[], byte[]>[] intervals = HBaseSalter
+ .getDistributedIntervals(startRow, stopRow,
+ regStartKeys[i], regStopKeys[i], prefixList);
+
+ for (Pair<byte[], byte[]> pair : intervals) {
+ LOG.info("".format(
+ "Using SALT, Region (%s) Start (%s) Stop (%s)",
+ regions[i], Bytes.toString(pair.getFirst()),
+ Bytes.toString(pair.getSecond())));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), pair.getFirst(),
+ pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
+ useSalt);
+
+ split.setEndRowInclusive(true);
+ splits.add(split);
+ }
+ }
+ }
+
+ LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size());
+ for (HBaseTableSplit s : splits) {
+ LOG.info("RETURNED SPLITS: split -> " + s);
+ }
+
+ return splits.toArray(new HBaseTableSplit[splits.size()]);
+ }
+
+ case GET_LIST: {
+ // if( keyList == null || keyList.size() == 0 ) {
+ if (keyList == null) {
+ throw new IOException(
+ "Source Mode is GET_LIST but key list is EMPTY");
+ }
+
+ if (useSalt) {
+ TreeSet<String> tempKeyList = new TreeSet<String>();
+
+ for (String key : keyList) {
+ tempKeyList.add(HBaseSalter.addSaltPrefix(key));
+ }
+
+ keyList = tempKeyList;
+ }
+
+ LOG.debug("".format("Splitting Key List (%s)", keyList));
+
+ List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ for (int i = 0; i < keys.getFirst().length; i++) {
+
+ if (!includeRegionInSplit(keys.getFirst()[i],
+ keys.getSecond()[i])) {
+ continue;
+ }
+
+ LOG.debug(String.format(
+ "Getting region (%s) subset (%s) to (%s)", regions[i],
+ Bytes.toString(regStartKeys[i]),
+ Bytes.toString(regStartKeys[i])));
+
+ Set<String> regionsSubSet = null;
+
+ if ((regStartKeys[i] == null || regStartKeys[i].length == 0)
+ && (regStopKeys[i] == null || regStopKeys[i].length == 0)) {
+ LOG.debug("REGION start is empty");
+ LOG.debug("REGION stop is empty");
+ regionsSubSet = keyList;
+ } else if (regStartKeys[i] == null
+ || regStartKeys[i].length == 0) {
+ LOG.debug("REGION start is empty");
+ regionsSubSet = keyList.headSet(
+ Bytes.toString(regStopKeys[i]), true);
+ } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) {
+ LOG.debug("REGION stop is empty");
+ regionsSubSet = keyList.tailSet(
+ Bytes.toString(regStartKeys[i]), true);
+ } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) {
+ regionsSubSet = keyList.subSet(
+ Bytes.toString(regStartKeys[i]), true,
+ Bytes.toString(regStopKeys[i]), true);
+ } else {
+ throw new IOException(String.format(
+ "For REGION (%s) Start Key (%s) > Stop Key(%s)",
+ regions[i], Bytes.toString(regStartKeys[i]),
+ Bytes.toString(regStopKeys[i])));
+ }
+
+ if (regionsSubSet == null || regionsSubSet.size() == 0) {
+ LOG.debug("EMPTY: Key is for region " + regions[i]
+ + " is null");
+
+ continue;
+ }
+
+ TreeSet<String> regionKeyList = new TreeSet<String>(
+ regionsSubSet);
+
+ LOG.debug(String.format("Regions [%s] has key list <%s>",
+ regions[i], regionKeyList));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), regionKeyList, versions, regions[i],
+ SourceMode.GET_LIST, useSalt);
+ splits.add(split);
+ }
+
+ LOG.debug("RETURNED SPLITS: split -> " + splits);
+
+ return splits.toArray(new HBaseTableSplit[splits.size()]);
+ }
+
+ default:
+ throw new IOException("Unknown source Mode : " + sourceMode);
+ }
+ }
+
+ private String reverseDNS(InetAddress ipAddress) throws NamingException {
+ String hostName = this.reverseDNSCacheMap.get(ipAddress);
+ if (hostName == null) {
+ hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(
+ ipAddress, this.nameServer));
+ this.reverseDNSCacheMap.put(ipAddress, hostName);
+ }
+ return hostName;
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ if (!(split instanceof HBaseTableSplit))
+ throw new IOException("Table Split is not type HBaseTableSplit");
+
+ HBaseTableSplit tSplit = (HBaseTableSplit) split;
+
+ HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE();
+
+ switch (tSplit.getSourceMode()) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+ LOG.debug(String.format(
+ "For split [%s] we have start key (%s) and stop key (%s)",
+ tSplit, tSplit.getStartRow(), tSplit.getEndRow()));
+
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setEndRowInclusive(tSplit.getEndRowInclusive());
+ trr.setUseSalt(useSalt);
+ }
+
+ break;
+
+ case GET_LIST: {
+ LOG.debug(String.format("For split [%s] we have key list (%s)",
+ tSplit, tSplit.getKeyList()));
+
+ trr.setKeyList(tSplit.getKeyList());
+ trr.setVersions(tSplit.getVersions());
+ trr.setUseSalt(useSalt);
+ }
+
+ break;
+
+ default:
+ throw new IOException("Unknown source mode : "
+ + tSplit.getSourceMode());
+ }
+
+ trr.setSourceMode(tSplit.getSourceMode());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+
+ trr.init();
+
+ return trr;
+ }
+
+ /* Configuration Section */
+
+ /**
+ * space delimited list of columns
+ */
+ public static final String COLUMN_LIST = "hbase.tablecolumns";
+
+ /**
+ * Use this jobconf param to specify the input table
+ */
+ private static final String INPUT_TABLE = "hbase.inputtable";
+
+ private String startKey = null;
+ private String stopKey = null;
+
+ private SourceMode sourceMode = SourceMode.EMPTY;
+ private TreeSet<String> keyList = null;
+ private int versions = 1;
+ private boolean useSalt = false;
+ private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
+
+ public void configure(JobConf job) {
+ String tableName = getTableName(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColumns(m_cols);
+
+ try {
+ setHTable(new HTable(HBaseConfiguration.create(job), tableName));
+ } catch (Exception e) {
+ LOG.error("************* Table could not be created");
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ LOG.debug("Entered : " + this.getClass() + " : configure()");
+
+ useSalt = job.getBoolean(
+ String.format(HBaseConstants.USE_SALT, getTableName(job)), false);
+ prefixList = job.get(
+ String.format(HBaseConstants.SALT_PREFIX, getTableName(job)),
+ HBaseSalter.DEFAULT_PREFIX_LIST);
+
+ sourceMode = SourceMode.valueOf(job.get(String.format(
+ HBaseConstants.SOURCE_MODE, getTableName(job))));
+
+ LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
+ .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
+ .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
+ sourceMode));
+
+ switch (sourceMode) {
+ case SCAN_RANGE:
+ LOG.info("HIT SCAN_RANGE");
+
+ startKey = getJobProp(job,
+ String.format(HBaseConstants.START_KEY, getTableName(job)));
+ stopKey = getJobProp(job,
+ String.format(HBaseConstants.STOP_KEY, getTableName(job)));
+
+ LOG.info(String.format("Setting start key (%s) and stop key (%s)",
+ startKey, stopKey));
+ break;
+
+ case GET_LIST:
+ LOG.info("HIT GET_LIST");
+
+ Collection<String> keys = job.getStringCollection(String.format(
+ HBaseConstants.KEY_LIST, getTableName(job)));
+ keyList = new TreeSet<String>(keys);
+
+ versions = job.getInt(
+ String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
+
+ LOG.debug("GOT KEY LIST : " + keys);
+ LOG.debug(String.format("SETTING key list (%s)", keyList));
+
+ break;
+
+ case EMPTY:
+ LOG.info("HIT EMPTY");
+
+ sourceMode = SourceMode.SCAN_ALL;
+ break;
+
+ default:
+ LOG.info("HIT DEFAULT");
+
+ break;
+ }
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ String tableName = getTableName(job);
+
+ if (tableName == null) {
+ throw new IOException("expecting one table name");
+ }
+ LOG.debug(String.format("Found Table name [%s]", tableName));
+
+ // connected to table?
+ if (getHTable() == null) {
+ throw new IOException("could not connect to table '" + tableName + "'");
+ }
+ LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
+
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ LOG.debug(String.format("Found Columns [%s]", colArg));
+
+ LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey,
+ stopKey));
+
+ if (sourceMode == SourceMode.EMPTY) {
+ throw new IOException("SourceMode should not be EMPTY");
+ }
+
+ if (sourceMode == SourceMode.GET_LIST
+ && (keyList == null || keyList.size() == 0)) {
+ throw new IOException("Source mode is GET_LIST bu key list is empty");
+ }
+ }
+
+ /* Getters & Setters */
+ private HTable getHTable() {
+ return this.table;
+ }
+
+ private void setHTable(HTable ht) {
+ this.table = ht;
+ }
+
+ private void setInputColumns(byte[][] ic) {
+ this.inputColumns = ic;
+ }
+
+ private void setJobProp(JobConf job, String key, String value) {
+ if (job.get(key) != null)
+ throw new RuntimeException(String.format(
+ "Job Conf already has key [%s] with value [%s]", key,
+ job.get(key)));
+ job.set(key, value);
+ }
+
+ private String getJobProp(JobConf job, String key) {
+ return job.get(key);
+ }
+
+ public static void setTableName(JobConf job, String tableName) {
+ // Make sure that table has not been set before
+ String oldTableName = getTableName(job);
+ if (oldTableName != null) {
+ throw new RuntimeException("table name already set to: '"
+ + oldTableName + "'");
+ }
+
+ job.set(INPUT_TABLE, tableName);
+ }
+
+ public static String getTableName(JobConf job) {
+ return job.get(INPUT_TABLE);
+ }
+
+ protected boolean includeRegionInSplit(final byte[] startKey,
+ final byte[] endKey) {
+ return true;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java
new file mode 100644
index 0000000..5eafc78
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java
@@ -0,0 +1,505 @@
+package parallelai.spyglass.hbase;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseRecordReader_SINGLE implements
+ RecordReader<ImmutableBytesWritable, Result> {
+
+ static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class);
+
+ private byte[] startRow;
+ private byte[] endRow;
+ private byte[] lastSuccessfulRow;
+ private TreeSet<String> keyList;
+ private SourceMode sourceMode;
+ private Filter trrRowFilter;
+ private ResultScanner scanner;
+ private HTable htable;
+ private byte[][] trrInputColumns;
+ private long timestamp;
+ private int rowcount;
+ private boolean logScannerActivity = false;
+ private int logPerRowCount = 100;
+ private boolean endRowInclusive = true;
+ private int versions = 1;
+ private boolean useSalt = false;
+
+ /**
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
+ * @throws IOException
+ */
+ public void restartRangeScan(byte[] firstRow) throws IOException {
+ Scan currentScan;
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
+
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(false);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
+ + ", endRow: " + Bytes.toString(endRow));
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
+ + ", no endRow");
+
+ Scan scan = new Scan(firstRow);
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ if (logScannerActivity) {
+ LOG.debug("Current scan=" + currentScan.toString());
+ timestamp = System.currentTimeMillis();
+ rowcount = 0;
+ }
+ }
+
+ public TreeSet<String> getKeyList() {
+ return keyList;
+ }
+
+ public void setKeyList(TreeSet<String> keyList) {
+ this.keyList = keyList;
+ }
+
+ public void setVersions(int versions) {
+ this.versions = versions;
+ }
+
+ public void setUseSalt(boolean useSalt) {
+ this.useSalt = useSalt;
+ }
+
+ public SourceMode getSourceMode() {
+ return sourceMode;
+ }
+
+ public void setSourceMode(SourceMode sourceMode) {
+ this.sourceMode = sourceMode;
+ }
+
+ public byte[] getEndRow() {
+ return endRow;
+ }
+
+ public void setEndRowInclusive(boolean isInclusive) {
+ endRowInclusive = isInclusive;
+ }
+
+ public boolean getEndRowInclusive() {
+ return endRowInclusive;
+ }
+
+ private byte[] nextKey = null;
+ private Vector<List<KeyValue>> resultVector = null;
+ Map<Long, List<KeyValue>> keyValueMap = null;
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ restartRangeScan(startRow);
+ break;
+
+ case GET_LIST:
+ nextKey = Bytes.toBytes(keyList.pollFirst());
+ break;
+
+ default:
+ throw new IOException(" Unknown source mode : " + sourceMode);
+ }
+ }
+
+ byte[] getStartRow() {
+ return this.startRow;
+ }
+
+ /**
+ * @param htable
+ * the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
+ false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns
+ * the columns to be placed in {@link Result}.
+ */
+ public void setInputColumns(final byte[][] inputColumns) {
+ this.trrInputColumns = inputColumns;
+ }
+
+ /**
+ * @param startRow
+ * the first row in the split
+ */
+ public void setStartRow(final byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ /**
+ *
+ * @param endRow
+ * the last row in the split
+ */
+ public void setEndRow(final byte[] endRow) {
+ this.endRow = endRow;
+ }
+
+ /**
+ * @param rowFilter
+ * the {@link Filter} to be used.
+ */
+ public void setRowFilter(Filter rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ @Override
+ public void close() {
+ if (this.scanner != null)
+ this.scanner.close();
+ }
+
+ /**
+ * @return ImmutableBytesWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ /**
+ * @return RowResult
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ @Override
+ public Result createValue() {
+ return new Result();
+ }
+
+ @Override
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ @Override
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key
+ * HStoreKey as input key.
+ * @param value
+ * MapWritable as input value
+ * @return true if there was more data
+ * @throws IOException
+ */
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value)
+ throws IOException {
+
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+
+ Result result;
+ try {
+ try {
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+ + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
+ }
+ }
+ } catch (IOException e) {
+ // try to handle all IOExceptions by restarting
+ // the scanner, if the second call fails, it will be rethrown
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation,"
+ + " if your mapper has restarted a few other times like this"
+ + " then you should consider killing this job and investigate"
+ + " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restartRangeScan(startRow);
+ } else {
+ restartRangeScan(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
+ }
+
+ if (result != null && result.size() > 0) {
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+ + rowcount + " rows");
+ LOG.debug(ioe);
+ String lastRow = lastSuccessfulRow == null ? "null" : Bytes
+ .toStringBinary(lastSuccessfulRow);
+ LOG.debug("lastSuccessfulRow=" + lastRow);
+ }
+ throw ioe;
+ }
+ }
+
+ case GET_LIST: {
+ LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey)));
+
+ if (versions == 1) {
+ if (nextKey != null) {
+ LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey)));
+
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result result = this.htable.get(theGet);
+
+ if (result != null && (! result.isEmpty()) ) {
+ LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) );
+
+ if (keyList != null || !keyList.isEmpty()) {
+ String newKey = keyList.pollFirst();
+ LOG.debug("New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+ } else {
+ nextKey = null;
+ }
+
+ LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey)));
+
+ // Write the result
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+
+ return true;
+ } else {
+ LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0
+
+ String newKey;
+ while((newKey = keyList.pollFirst()) != null) {
+ LOG.debug("WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( nextKey == null ) {
+ LOG.error("BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if( ! this.htable.exists( new Get(nextKey) ) ) {
+ LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+ continue;
+ } else { break; }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ LOG.debug("Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
+ }
+ } else {
+ // Nothig left. return false
+ return false;
+ }
+ } else {
+ if (resultVector != null && resultVector.size() != 0) {
+ LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) );
+
+ List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) );
+
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+
+ return true;
+ } else {
+ if (nextKey != null) {
+ LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey)));
+
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result resultAll = this.htable.get(theGet);
+
+ if( resultAll != null && (! resultAll.isEmpty())) {
+ List<KeyValue> keyValeList = resultAll.list();
+
+ keyValueMap = new HashMap<Long, List<KeyValue>>();
+
+ LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap));
+
+ for (KeyValue keyValue : keyValeList) {
+ long version = keyValue.getTimestamp();
+
+ if (keyValueMap.containsKey(new Long(version))) {
+ List<KeyValue> keyValueTempList = keyValueMap.get(new Long(
+ version));
+ if (keyValueTempList == null) {
+ keyValueTempList = new ArrayList<KeyValue>();
+ }
+ keyValueTempList.add(keyValue);
+ } else {
+ List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
+ keyValueMap.put(new Long(version), keyValueTempList);
+ keyValueTempList.add(keyValue);
+ }
+ }
+
+ resultVector = new Vector<List<KeyValue>>();
+ resultVector.addAll(keyValueMap.values());
+
+ List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) );
+
+ String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
+
+ System.out.println("+ New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ } else {
+ LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
+
+ String newKey;
+
+ while( (newKey = keyList.pollFirst()) != null ) {
+ LOG.debug("+ WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( nextKey == null ) {
+ LOG.error("+ BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if( ! this.htable.exists( new Get(nextKey) ) ) {
+ LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+ continue;
+ } else { break; }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ LOG.debug("+ Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
+ }
+
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+ default:
+ throw new IOException("Unknown source mode : " + sourceMode);
+ }
+ }
+}