From 9a9f51c9eea317f5d28e6bc2f930b45159e7bb57 Mon Sep 17 00:00:00 2001
From: Chandan Rajah <crajah@parallelai.com>
Date: Wed, 7 Aug 2013 11:37:13 +0100
Subject: Added testing code for multi input splits

---
 .../spyglass/hbase/HBaseInputFormat.java           |   2 +-
 .../spyglass/hbase/HBaseInputFormat_SINGLE.java    | 622 +++++++++++++++++++++
 .../spyglass/hbase/HBaseRecordReader_SINGLE.java   | 505 +++++++++++++++++
 src/main/resources/application.conf                |   0
 .../spyglass/hbase/example/HBaseExample.scala      |  23 -
 .../hbase/example/HBaseExampleRunner.scala         |  33 ++
 .../spyglass/hbase/HBaseSalterTester.java          |  22 +-
 7 files changed, 1172 insertions(+), 35 deletions(-)
 create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java
 create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java
 create mode 100644 src/main/resources/application.conf

(limited to 'src')

diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
index 8e121bc..5a1184f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
@@ -90,7 +90,7 @@ public class HBaseInputFormat implements
 
 	@SuppressWarnings("deprecation")
 	@Override
-	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+	public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 		if (this.table == null) {
 			throw new IOException("No table was provided");
 		}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java
new file mode 100644
index 0000000..96bfea1
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java
@@ -0,0 +1,622 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import javax.naming.NamingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseInputFormat_SINGLE implements
+		InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
+
+	private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class);
+
+	private final String id = UUID.randomUUID().toString();
+
+	private byte[][] inputColumns;
+	private HTable table;
+	// private HBaseRecordReader tableRecordReader;
+	private Filter rowFilter;
+	// private String tableName = "";
+
+	private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();
+
+	private String nameServer = null;
+
+	// private Scan scan = null;
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+		if (this.table == null) {
+			throw new IOException("No table was provided");
+		}
+
+		if (this.inputColumns == null || this.inputColumns.length == 0) {
+			throw new IOException("Expecting at least one column");
+		}
+
+		Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+
+		if (keys == null || keys.getFirst() == null
+				|| keys.getFirst().length == 0) {
+			HRegionLocation regLoc = table.getRegionLocation(
+					HConstants.EMPTY_BYTE_ARRAY, false);
+
+			if (null == regLoc) {
+				throw new IOException("Expecting at least one region.");
+			}
+
+			List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
+			HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
+					HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+							.getHostnamePort().split(
+									Addressing.HOSTNAME_PORT_SEPARATOR)[0],
+					SourceMode.EMPTY, false);
+
+			splits.add(split);
+
+			return splits.toArray(new HBaseTableSplit[splits.size()]);
+		}
+
+		if (keys.getSecond() == null || keys.getSecond().length == 0) {
+			throw new IOException("Expecting at least one region.");
+		}
+
+		if (keys.getFirst().length != keys.getSecond().length) {
+			throw new IOException("Regions for start and end key do not match");
+		}
+
+		byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
+		byte[] maxKey = keys.getSecond()[0];
+
+		LOG.debug(String.format("SETTING min key (%s) and max key (%s)",
+				Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+		byte[][] regStartKeys = keys.getFirst();
+		byte[][] regStopKeys = keys.getSecond();
+		String[] regions = new String[regStartKeys.length];
+
+		for (int i = 0; i < regStartKeys.length; i++) {
+			minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0)
+					&& (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i]
+					: minKey;
+			maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0)
+					&& (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i]
+					: maxKey;
+
+			HServerAddress regionServerAddress = table.getRegionLocation(
+					keys.getFirst()[i]).getServerAddress();
+			InetAddress regionAddress = regionServerAddress.getInetSocketAddress()
+					.getAddress();
+			String regionLocation;
+			try {
+				regionLocation = reverseDNS(regionAddress);
+			} catch (NamingException e) {
+				LOG.error("Cannot resolve the host name for " + regionAddress
+						+ " because of " + e);
+				regionLocation = regionServerAddress.getHostname();
+			}
+
+			// HServerAddress regionServerAddress =
+			// table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+			// InetAddress regionAddress =
+			// regionServerAddress.getInetSocketAddress().getAddress();
+			//
+			// String regionLocation;
+			//
+			// try {
+			// regionLocation = reverseDNS(regionAddress);
+			// } catch (NamingException e) {
+			// LOG.error("Cannot resolve the host name for " + regionAddress +
+			// " because of " + e);
+			// regionLocation = regionServerAddress.getHostname();
+			// }
+
+			// String regionLocation =
+			// table.getRegionLocation(keys.getFirst()[i]).getHostname();
+
+			LOG.debug("***** " + regionLocation);
+
+			if (regionLocation == null || regionLocation.length() == 0)
+				throw new IOException("The region info for regiosn " + i
+						+ " is null or empty");
+
+			regions[i] = regionLocation;
+
+			LOG.debug(String.format(
+					"Region (%s) has start key (%s) and stop key (%s)", regions[i],
+					Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
+		}
+
+		byte[] startRow = HConstants.EMPTY_START_ROW;
+		byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+		LOG.debug(String.format("Found min key (%s) and max key (%s)",
+				Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+		LOG.debug("SOURCE MODE is : " + sourceMode);
+
+		switch (sourceMode) {
+			case SCAN_ALL:
+				startRow = HConstants.EMPTY_START_ROW;
+				stopRow = HConstants.EMPTY_END_ROW;
+
+				LOG.info(String.format(
+						"SCAN ALL: Found start key (%s) and stop key (%s)",
+						Bytes.toString(startRow), Bytes.toString(stopRow)));
+			break;
+
+			case SCAN_RANGE:
+				startRow = (startKey != null && startKey.length() != 0) ? Bytes
+						.toBytes(startKey) : HConstants.EMPTY_START_ROW;
+				stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes
+						.toBytes(stopKey) : HConstants.EMPTY_END_ROW;
+
+				LOG.info(String.format(
+						"SCAN RANGE: Found start key (%s) and stop key (%s)",
+						Bytes.toString(startRow), Bytes.toString(stopRow)));
+			break;
+		}
+
+		switch (sourceMode) {
+			case EMPTY:
+			case SCAN_ALL:
+			case SCAN_RANGE: {
+				// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey :
+				// startRow;
+				// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey :
+				// stopRow;
+
+				List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+				if (!useSalt) {
+
+					List<HRegionLocation> validRegions = table.getRegionsInRange(
+							startRow, stopRow);
+
+					int maxRegions = validRegions.size();
+					int currentRegion = 1;
+
+					for (HRegionLocation cRegion : validRegions) {
+						byte[] rStart = cRegion.getRegionInfo().getStartKey();
+						byte[] rStop = cRegion.getRegionInfo().getEndKey();
+
+						HServerAddress regionServerAddress = cRegion
+								.getServerAddress();
+						InetAddress regionAddress = regionServerAddress
+								.getInetSocketAddress().getAddress();
+						String regionLocation;
+						try {
+							regionLocation = reverseDNS(regionAddress);
+						} catch (NamingException e) {
+							LOG.error("Cannot resolve the host name for "
+									+ regionAddress + " because of " + e);
+							regionLocation = regionServerAddress.getHostname();
+						}
+
+						byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
+								|| (Bytes.compareTo(startRow, rStart) <= 0) ? rStart
+								: startRow);
+						byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW
+								|| (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop
+								: stopRow);
+
+						LOG.debug(String.format(
+								"BOOL start (%s) stop (%s) length (%d)",
+								(startRow == HConstants.EMPTY_START_ROW || (Bytes
+										.compareTo(startRow, rStart) <= 0)),
+								(stopRow == HConstants.EMPTY_END_ROW || (Bytes
+										.compareTo(stopRow, rStop) >= 0)), rStop.length));
+
+						HBaseTableSplit split = new HBaseTableSplit(
+								table.getTableName(), sStart, sStop, regionLocation,
+								SourceMode.SCAN_RANGE, useSalt);
+
+						split.setEndRowInclusive(currentRegion == maxRegions);
+
+						currentRegion++;
+
+						LOG.debug(String
+								.format(
+										"START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
+										Bytes.toString(startRow),
+										Bytes.toString(stopRow), Bytes.toString(rStart),
+										Bytes.toString(rStop), Bytes.toString(sStart),
+										Bytes.toString(sStop), cRegion.getHostnamePort(),
+										split));
+
+						splits.add(split);
+					}
+				} else {
+					LOG.debug("Using SALT : " + useSalt);
+
+					// Will return the start and the stop key with all possible
+					// prefixes.
+					for (int i = 0; i < regions.length; i++) {
+						Pair<byte[], byte[]>[] intervals = HBaseSalter
+								.getDistributedIntervals(startRow, stopRow,
+										regStartKeys[i], regStopKeys[i], prefixList);
+
+						for (Pair<byte[], byte[]> pair : intervals) {
+							LOG.info("".format(
+									"Using SALT, Region (%s) Start (%s) Stop (%s)",
+									regions[i], Bytes.toString(pair.getFirst()),
+									Bytes.toString(pair.getSecond())));
+
+							HBaseTableSplit split = new HBaseTableSplit(
+									table.getTableName(), pair.getFirst(),
+									pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
+									useSalt);
+
+							split.setEndRowInclusive(true);
+							splits.add(split);
+						}
+					}
+				}
+
+				LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size());
+				for (HBaseTableSplit s : splits) {
+					LOG.info("RETURNED SPLITS: split -> " + s);
+				}
+
+				return splits.toArray(new HBaseTableSplit[splits.size()]);
+			}
+
+			case GET_LIST: {
+				// if( keyList == null || keyList.size() == 0 ) {
+				if (keyList == null) {
+					throw new IOException(
+							"Source Mode is GET_LIST but key list is EMPTY");
+				}
+
+				if (useSalt) {
+					TreeSet<String> tempKeyList = new TreeSet<String>();
+
+					for (String key : keyList) {
+						tempKeyList.add(HBaseSalter.addSaltPrefix(key));
+					}
+
+					keyList = tempKeyList;
+				}
+
+				LOG.debug("".format("Splitting Key List (%s)", keyList));
+
+				List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+				for (int i = 0; i < keys.getFirst().length; i++) {
+
+					if (!includeRegionInSplit(keys.getFirst()[i],
+							keys.getSecond()[i])) {
+						continue;
+					}
+
+					LOG.debug(String.format(
+							"Getting region (%s) subset (%s) to (%s)", regions[i],
+							Bytes.toString(regStartKeys[i]),
+							Bytes.toString(regStartKeys[i])));
+
+					Set<String> regionsSubSet = null;
+
+					if ((regStartKeys[i] == null || regStartKeys[i].length == 0)
+							&& (regStopKeys[i] == null || regStopKeys[i].length == 0)) {
+						LOG.debug("REGION start is empty");
+						LOG.debug("REGION stop is empty");
+						regionsSubSet = keyList;
+					} else if (regStartKeys[i] == null
+							|| regStartKeys[i].length == 0) {
+						LOG.debug("REGION start is empty");
+						regionsSubSet = keyList.headSet(
+								Bytes.toString(regStopKeys[i]), true);
+					} else if (regStopKeys[i] == null || regStopKeys[i].length == 0) {
+						LOG.debug("REGION stop is empty");
+						regionsSubSet = keyList.tailSet(
+								Bytes.toString(regStartKeys[i]), true);
+					} else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) {
+						regionsSubSet = keyList.subSet(
+								Bytes.toString(regStartKeys[i]), true,
+								Bytes.toString(regStopKeys[i]), true);
+					} else {
+						throw new IOException(String.format(
+								"For REGION (%s) Start Key (%s) > Stop Key(%s)",
+								regions[i], Bytes.toString(regStartKeys[i]),
+								Bytes.toString(regStopKeys[i])));
+					}
+
+					if (regionsSubSet == null || regionsSubSet.size() == 0) {
+						LOG.debug("EMPTY: Key is for region " + regions[i]
+								+ " is null");
+
+						continue;
+					}
+
+					TreeSet<String> regionKeyList = new TreeSet<String>(
+							regionsSubSet);
+
+					LOG.debug(String.format("Regions [%s] has key list <%s>",
+							regions[i], regionKeyList));
+
+					HBaseTableSplit split = new HBaseTableSplit(
+							table.getTableName(), regionKeyList, versions, regions[i],
+							SourceMode.GET_LIST, useSalt);
+					splits.add(split);
+				}
+
+				LOG.debug("RETURNED SPLITS: split -> " + splits);
+
+				return splits.toArray(new HBaseTableSplit[splits.size()]);
+			}
+
+			default:
+				throw new IOException("Unknown source Mode : " + sourceMode);
+		}
+	}
+
+	private String reverseDNS(InetAddress ipAddress) throws NamingException {
+		String hostName = this.reverseDNSCacheMap.get(ipAddress);
+		if (hostName == null) {
+			hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(
+					ipAddress, this.nameServer));
+			this.reverseDNSCacheMap.put(ipAddress, hostName);
+		}
+		return hostName;
+	}
+
+	@Override
+	public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+			InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+		if (!(split instanceof HBaseTableSplit))
+			throw new IOException("Table Split is not type HBaseTableSplit");
+
+		HBaseTableSplit tSplit = (HBaseTableSplit) split;
+
+		HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE();
+
+		switch (tSplit.getSourceMode()) {
+			case SCAN_ALL:
+			case SCAN_RANGE: {
+				LOG.debug(String.format(
+						"For split [%s] we have start key (%s) and stop key (%s)",
+						tSplit, tSplit.getStartRow(), tSplit.getEndRow()));
+
+				trr.setStartRow(tSplit.getStartRow());
+				trr.setEndRow(tSplit.getEndRow());
+				trr.setEndRowInclusive(tSplit.getEndRowInclusive());
+				trr.setUseSalt(useSalt);
+			}
+
+			break;
+
+			case GET_LIST: {
+				LOG.debug(String.format("For split [%s] we have key list (%s)",
+						tSplit, tSplit.getKeyList()));
+
+				trr.setKeyList(tSplit.getKeyList());
+				trr.setVersions(tSplit.getVersions());
+				trr.setUseSalt(useSalt);
+			}
+
+			break;
+
+			default:
+				throw new IOException("Unknown source mode : "
+						+ tSplit.getSourceMode());
+		}
+
+		trr.setSourceMode(tSplit.getSourceMode());
+		trr.setHTable(this.table);
+		trr.setInputColumns(this.inputColumns);
+		trr.setRowFilter(this.rowFilter);
+
+		trr.init();
+
+		return trr;
+	}
+
+	/* Configuration Section */
+
+	/**
+	 * space delimited list of columns
+	 */
+	public static final String COLUMN_LIST = "hbase.tablecolumns";
+
+	/**
+	 * Use this jobconf param to specify the input table
+	 */
+	private static final String INPUT_TABLE = "hbase.inputtable";
+
+	private String startKey = null;
+	private String stopKey = null;
+
+	private SourceMode sourceMode = SourceMode.EMPTY;
+	private TreeSet<String> keyList = null;
+	private int versions = 1;
+	private boolean useSalt = false;
+	private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
+
+	public void configure(JobConf job) {
+		String tableName = getTableName(job);
+		String colArg = job.get(COLUMN_LIST);
+		String[] colNames = colArg.split(" ");
+		byte[][] m_cols = new byte[colNames.length][];
+		for (int i = 0; i < m_cols.length; i++) {
+			m_cols[i] = Bytes.toBytes(colNames[i]);
+		}
+		setInputColumns(m_cols);
+
+		try {
+			setHTable(new HTable(HBaseConfiguration.create(job), tableName));
+		} catch (Exception e) {
+			LOG.error("************* Table could not be created");
+			LOG.error(StringUtils.stringifyException(e));
+		}
+
+		LOG.debug("Entered : " + this.getClass() + " : configure()");
+
+		useSalt = job.getBoolean(
+				String.format(HBaseConstants.USE_SALT, getTableName(job)), false);
+		prefixList = job.get(
+				String.format(HBaseConstants.SALT_PREFIX, getTableName(job)),
+				HBaseSalter.DEFAULT_PREFIX_LIST);
+
+		sourceMode = SourceMode.valueOf(job.get(String.format(
+				HBaseConstants.SOURCE_MODE, getTableName(job))));
+
+		LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
+				.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
+				.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
+				sourceMode));
+
+		switch (sourceMode) {
+			case SCAN_RANGE:
+				LOG.info("HIT SCAN_RANGE");
+
+				startKey = getJobProp(job,
+						String.format(HBaseConstants.START_KEY, getTableName(job)));
+				stopKey = getJobProp(job,
+						String.format(HBaseConstants.STOP_KEY, getTableName(job)));
+
+				LOG.info(String.format("Setting start key (%s) and stop key (%s)",
+						startKey, stopKey));
+			break;
+
+			case GET_LIST:
+				LOG.info("HIT GET_LIST");
+
+				Collection<String> keys = job.getStringCollection(String.format(
+						HBaseConstants.KEY_LIST, getTableName(job)));
+				keyList = new TreeSet<String>(keys);
+
+				versions = job.getInt(
+						String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
+
+				LOG.debug("GOT KEY LIST : " + keys);
+				LOG.debug(String.format("SETTING key list (%s)", keyList));
+
+			break;
+
+			case EMPTY:
+				LOG.info("HIT EMPTY");
+
+				sourceMode = SourceMode.SCAN_ALL;
+			break;
+
+			default:
+				LOG.info("HIT DEFAULT");
+
+			break;
+		}
+	}
+
+	public void validateInput(JobConf job) throws IOException {
+		// expecting exactly one path
+		String tableName = getTableName(job);
+
+		if (tableName == null) {
+			throw new IOException("expecting one table name");
+		}
+		LOG.debug(String.format("Found Table name [%s]", tableName));
+
+		// connected to table?
+		if (getHTable() == null) {
+			throw new IOException("could not connect to table '" + tableName + "'");
+		}
+		LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
+
+		// expecting at least one column
+		String colArg = job.get(COLUMN_LIST);
+		if (colArg == null || colArg.length() == 0) {
+			throw new IOException("expecting at least one column");
+		}
+		LOG.debug(String.format("Found Columns [%s]", colArg));
+
+		LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey,
+				stopKey));
+
+		if (sourceMode == SourceMode.EMPTY) {
+			throw new IOException("SourceMode should not be EMPTY");
+		}
+
+		if (sourceMode == SourceMode.GET_LIST
+				&& (keyList == null || keyList.size() == 0)) {
+			throw new IOException("Source mode is GET_LIST bu key list is empty");
+		}
+	}
+
+	/* Getters & Setters */
+	private HTable getHTable() {
+		return this.table;
+	}
+
+	private void setHTable(HTable ht) {
+		this.table = ht;
+	}
+
+	private void setInputColumns(byte[][] ic) {
+		this.inputColumns = ic;
+	}
+
+	private void setJobProp(JobConf job, String key, String value) {
+		if (job.get(key) != null)
+			throw new RuntimeException(String.format(
+					"Job Conf already has key [%s] with value [%s]", key,
+					job.get(key)));
+		job.set(key, value);
+	}
+
+	private String getJobProp(JobConf job, String key) {
+		return job.get(key);
+	}
+
+	public static void setTableName(JobConf job, String tableName) {
+		// Make sure that table has not been set before
+		String oldTableName = getTableName(job);
+		if (oldTableName != null) {
+			throw new RuntimeException("table name already set to: '"
+					+ oldTableName + "'");
+		}
+
+		job.set(INPUT_TABLE, tableName);
+	}
+
+	public static String getTableName(JobConf job) {
+		return job.get(INPUT_TABLE);
+	}
+
+	protected boolean includeRegionInSplit(final byte[] startKey,
+			final byte[] endKey) {
+		return true;
+	}
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java
new file mode 100644
index 0000000..5eafc78
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java
@@ -0,0 +1,505 @@
+package parallelai.spyglass.hbase;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes; 
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseRecordReader_SINGLE implements
+    RecordReader<ImmutableBytesWritable, Result> {
+
+  static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class);
+
+  private byte[] startRow;
+  private byte[] endRow;
+  private byte[] lastSuccessfulRow;
+  private TreeSet<String> keyList;
+  private SourceMode sourceMode;
+  private Filter trrRowFilter;
+  private ResultScanner scanner;
+  private HTable htable;
+  private byte[][] trrInputColumns;
+  private long timestamp;
+  private int rowcount;
+  private boolean logScannerActivity = false;
+  private int logPerRowCount = 100;
+  private boolean endRowInclusive = true;
+  private int versions = 1;
+  private boolean useSalt = false;
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   * 
+   * @param firstRow
+   * @throws IOException
+   */
+  public void restartRangeScan(byte[] firstRow) throws IOException {
+    Scan currentScan;
+    if ((endRow != null) && (endRow.length > 0)) {
+      if (trrRowFilter != null) {
+        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+            new byte[] { 0 }) : endRow));
+
+        TableInputFormat.addColumns(scan, trrInputColumns);
+        scan.setFilter(trrRowFilter);
+        scan.setCacheBlocks(false);
+        this.scanner = this.htable.getScanner(scan);
+        currentScan = scan;
+      } else {
+        LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
+            + ", endRow: " + Bytes.toString(endRow));
+        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+            new byte[] { 0 }) : endRow));
+        TableInputFormat.addColumns(scan, trrInputColumns);
+        this.scanner = this.htable.getScanner(scan);
+        currentScan = scan;
+      }
+    } else {
+      LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
+          + ", no endRow");
+
+      Scan scan = new Scan(firstRow);
+      TableInputFormat.addColumns(scan, trrInputColumns);
+      scan.setFilter(trrRowFilter);
+      this.scanner = this.htable.getScanner(scan);
+      currentScan = scan;
+    }
+    if (logScannerActivity) {
+      LOG.debug("Current scan=" + currentScan.toString());
+      timestamp = System.currentTimeMillis();
+      rowcount = 0;
+    }
+  }
+
+  public TreeSet<String> getKeyList() {
+    return keyList;
+  }
+
+  public void setKeyList(TreeSet<String> keyList) {
+    this.keyList = keyList;
+  }
+
+  public void setVersions(int versions) {
+    this.versions = versions;
+  }
+  
+  public void setUseSalt(boolean useSalt) {
+    this.useSalt = useSalt;
+  }
+
+  public SourceMode getSourceMode() {
+    return sourceMode;
+  }
+
+  public void setSourceMode(SourceMode sourceMode) {
+    this.sourceMode = sourceMode;
+  }
+
+  public byte[] getEndRow() {
+    return endRow;
+  }
+
+  public void setEndRowInclusive(boolean isInclusive) {
+    endRowInclusive = isInclusive;
+  }
+
+  public boolean getEndRowInclusive() {
+    return endRowInclusive;
+  }
+
+  private byte[] nextKey = null;
+  private Vector<List<KeyValue>> resultVector = null;
+  Map<Long, List<KeyValue>> keyValueMap = null;
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   * 
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    switch (sourceMode) {
+    case SCAN_ALL:
+    case SCAN_RANGE:
+      restartRangeScan(startRow);
+      break;
+
+    case GET_LIST:
+      nextKey = Bytes.toBytes(keyList.pollFirst());
+      break;
+
+    default:
+      throw new IOException(" Unknown source mode : " + sourceMode);
+    }
+  }
+
+  byte[] getStartRow() {
+    return this.startRow;
+  }
+
+  /**
+   * @param htable
+   *          the {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    Configuration conf = htable.getConfiguration();
+    logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
+        false);
+    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+    this.htable = htable;
+  }
+
+  /**
+   * @param inputColumns
+   *          the columns to be placed in {@link Result}.
+   */
+  public void setInputColumns(final byte[][] inputColumns) {
+    this.trrInputColumns = inputColumns;
+  }
+
+  /**
+   * @param startRow
+   *          the first row in the split
+   */
+  public void setStartRow(final byte[] startRow) {
+    this.startRow = startRow;
+  }
+
+  /**
+   * 
+   * @param endRow
+   *          the last row in the split
+   */
+  public void setEndRow(final byte[] endRow) {
+    this.endRow = endRow;
+  }
+
+  /**
+   * @param rowFilter
+   *          the {@link Filter} to be used.
+   */
+  public void setRowFilter(Filter rowFilter) {
+    this.trrRowFilter = rowFilter;
+  }
+
+  @Override
+  public void close() {
+    if (this.scanner != null)
+      this.scanner.close();
+  }
+
+  /**
+   * @return ImmutableBytesWritable
+   * 
+   * @see org.apache.hadoop.mapred.RecordReader#createKey()
+   */
+  @Override
+  public ImmutableBytesWritable createKey() {
+    return new ImmutableBytesWritable();
+  }
+
+  /**
+   * @return RowResult
+   * 
+   * @see org.apache.hadoop.mapred.RecordReader#createValue()
+   */
+  @Override
+  public Result createValue() {
+    return new Result();
+  }
+
+  @Override
+  public long getPos() {
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return 0;
+  }
+
+  @Override
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return 0;
+  }
+
+  /**
+   * @param key
+   *          HStoreKey as input key.
+   * @param value
+   *          MapWritable as input value
+   * @return true if there was more data
+   * @throws IOException
+   */
+  @Override
+  public boolean next(ImmutableBytesWritable key, Result value)
+      throws IOException {
+
+    switch (sourceMode) {
+    case SCAN_ALL:
+    case SCAN_RANGE: {
+
+      Result result;
+      try {
+        try {
+          result = this.scanner.next();
+          if (logScannerActivity) {
+            rowcount++;
+            if (rowcount >= logPerRowCount) {
+              long now = System.currentTimeMillis();
+              LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+                  + rowcount + " rows");
+              timestamp = now;
+              rowcount = 0;
+            }
+          }
+        } catch (IOException e) {
+          // try to handle all IOExceptions by restarting
+          // the scanner, if the second call fails, it will be rethrown
+          LOG.debug("recovered from " + StringUtils.stringifyException(e));
+          if (lastSuccessfulRow == null) {
+            LOG.warn("We are restarting the first next() invocation,"
+                + " if your mapper has restarted a few other times like this"
+                + " then you should consider killing this job and investigate"
+                + " why it's taking so long.");
+          }
+          if (lastSuccessfulRow == null) {
+            restartRangeScan(startRow);
+          } else {
+            restartRangeScan(lastSuccessfulRow);
+            this.scanner.next(); // skip presumed already mapped row
+          }
+          result = this.scanner.next();
+        }
+
+        if (result != null && result.size() > 0) {
+          if( useSalt) {
+            key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+          } else {
+            key.set(result.getRow());
+          }
+          
+          lastSuccessfulRow = key.get();
+          Writables.copyWritable(result, value);
+          return true;
+        }
+        return false;
+      } catch (IOException ioe) {
+        if (logScannerActivity) {
+          long now = System.currentTimeMillis();
+          LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+              + rowcount + " rows");
+          LOG.debug(ioe);
+          String lastRow = lastSuccessfulRow == null ? "null" : Bytes
+              .toStringBinary(lastSuccessfulRow);
+          LOG.debug("lastSuccessfulRow=" + lastRow);
+        }
+        throw ioe;
+      }
+    }
+
+    case GET_LIST: {
+      LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey)));
+      
+      if (versions == 1) {
+        if (nextKey != null) {
+          LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey)));
+          
+          Get theGet = new Get(nextKey);
+          theGet.setMaxVersions(versions);
+
+          Result result = this.htable.get(theGet);
+
+          if (result != null && (! result.isEmpty()) ) {
+            LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) );
+
+            if (keyList != null || !keyList.isEmpty()) {
+              String newKey = keyList.pollFirst();
+              LOG.debug("New Key => " + newKey);
+              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+                  .toBytes(newKey);
+            } else {
+              nextKey = null;
+            }
+            
+            LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey)));
+            
+            // Write the result
+            if( useSalt) {
+              key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+            } else {
+              key.set(result.getRow());
+            }
+            lastSuccessfulRow = key.get();
+            Writables.copyWritable(result, value);
+            
+            return true;
+          } else {
+            LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0
+            
+            String newKey;
+            while((newKey = keyList.pollFirst()) != null) {
+              LOG.debug("WHILE NEXT Key => " + newKey);
+
+              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+                  .toBytes(newKey);
+              
+              if( nextKey == null ) {
+                LOG.error("BOMB! BOMB! BOMB!");
+                continue; 
+              }
+              
+              if( ! this.htable.exists( new Get(nextKey) ) ) {
+                LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+                continue; 
+              } else { break; }
+            }
+            
+            nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+                .toBytes(newKey);
+            
+            LOG.debug("Final New Key => " + Bytes.toString(nextKey));
+
+            return next(key, value);
+          }
+        } else {
+          // Nothig left. return false
+          return false;
+        }
+      } else {
+        if (resultVector != null && resultVector.size() != 0) { 
+          LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) );
+
+          List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+          Result result = new Result(resultKeyValue);
+
+          LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) );
+
+          if( useSalt) {
+            key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+          } else {
+            key.set(result.getRow());
+          }
+          lastSuccessfulRow = key.get();
+          Writables.copyWritable(result, value);
+
+          return true;
+        } else {
+          if (nextKey != null) {
+            LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey)));
+            
+            Get theGet = new Get(nextKey);
+            theGet.setMaxVersions(versions);
+
+            Result resultAll = this.htable.get(theGet);
+            
+            if( resultAll != null && (! resultAll.isEmpty())) {
+              List<KeyValue> keyValeList = resultAll.list();
+
+              keyValueMap = new HashMap<Long, List<KeyValue>>();
+              
+              LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap));
+
+              for (KeyValue keyValue : keyValeList) {
+                long version = keyValue.getTimestamp();
+
+                if (keyValueMap.containsKey(new Long(version))) {
+                  List<KeyValue> keyValueTempList = keyValueMap.get(new Long(
+                      version));
+                  if (keyValueTempList == null) {
+                    keyValueTempList = new ArrayList<KeyValue>();
+                  }
+                  keyValueTempList.add(keyValue);
+                } else {
+                  List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
+                  keyValueMap.put(new Long(version), keyValueTempList);
+                  keyValueTempList.add(keyValue);
+                }
+              }
+
+              resultVector = new Vector<List<KeyValue>>();
+              resultVector.addAll(keyValueMap.values());
+
+              List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+
+              Result result = new Result(resultKeyValue);
+
+              LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) );
+
+              String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
+
+              System.out.println("+ New Key => " + newKey);
+              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+                  .toBytes(newKey);
+
+              if( useSalt) {
+                key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+              } else {
+                key.set(result.getRow());
+              }
+              lastSuccessfulRow = key.get();
+              Writables.copyWritable(result, value);
+              return true;
+            } else {
+              LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
+              
+              String newKey;
+              
+              while( (newKey = keyList.pollFirst()) != null ) {
+                LOG.debug("+ WHILE NEXT Key => " + newKey);
+
+                nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+                    .toBytes(newKey);
+                
+                if( nextKey == null ) {
+                  LOG.error("+ BOMB! BOMB! BOMB!");
+                  continue; 
+                }
+                
+                if( ! this.htable.exists( new Get(nextKey) ) ) {
+                  LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+                  continue; 
+                } else { break; }
+              }
+              
+              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+                  .toBytes(newKey);
+              
+              LOG.debug("+ Final New Key => " + Bytes.toString(nextKey));
+
+              return next(key, value);
+            }
+            
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+    default:
+      throw new IOException("Unknown source mode : " + sourceMode);
+    }
+  }
+}
diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
new file mode 100644
index 0000000..e69de29
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
index 2aa5342..13c75d6 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
@@ -30,29 +30,6 @@ class HBaseExample(args: Args) extends JobBase(args) {
    println("Output : " + output)
    println("Quorum : " + quorumNames)
 
-   case class HBaseTableStore(
-      conf: Configuration,
-      quorum: String,
-      tableName: String) {
-
-      val tableBytes = Bytes.toBytes(tableName)
-      val connection = HConnectionManager.getConnection(conf)
-      val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
-
-      conf.set("hbase.zookeeper.quorum", quorumNames)
-
-      val htable = new HTable(HBaseConfiguration.create(conf), tableName)
-
-      def makeN(n: Int) {
-         (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => {
-            val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x)))
-            put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x))
-         })
-      }
-
-   }
-
-   HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000)
 
    val hbs2 = new HBaseSource(
       "_TEST.SALT.01",
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
index 920f17d..c503247 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
@@ -4,6 +4,13 @@ import com.twitter.scalding.Tool
 import org.joda.time.format.DateTimeFormat
 import java.util.Formatter.DateTime
 import parallelai.spyglass.base.JobRunner
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration}
+import org.apache.hadoop.hbase.client.{Put, HTable, HConnectionManager, HBaseAdmin}
+import org.apache.hadoop.hbase.io.hfile.Compression
+import org.apache.hadoop.hbase.regionserver.StoreFile
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.hbase.HBaseSalter
 
 object HBaseExampleRunner extends App {
    val appPath = System.getenv("BIGDATA_APPCONF_PATH")
@@ -35,7 +42,33 @@ object HBaseExampleRunner extends App {
 
    val output = "HBaseTest.%s"
 
+  case class HBaseTableStore(
+                              conf: Configuration,
+                              quorum: String,
+                              tableName: String) {
+
+    val tableBytes = Bytes.toBytes(tableName)
+    val connection = HConnectionManager.getConnection(conf)
+    val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
+
+    conf.set("hbase.zookeeper.quorum", quorum)
+
+    val htable = new HTable(HBaseConfiguration.create(conf), tableName)
+
+    def makeN(n: Int) {
+      (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => {
+        val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x)))
+        put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x))
+      })
+    }
+
+  }
+
+  val conf: Configuration = HBaseConfiguration.create
+  HBaseTableStore(conf, quorum, "_TEST.SALT.01").makeN(100000)
+
    JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath,
       "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum))
 
+
 }
\ No newline at end of file
diff --git a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java b/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java
index 34d7ab4..1f4c8e7 100644
--- a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java
+++ b/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java
@@ -96,7 +96,7 @@ public class HBaseSalterTester {
 	public void getAllKeysWithPrefixAndRange() throws IOException {
 		String keyStr = "1021";
 		byte [] keyBytes = Bytes.toBytes(keyStr);
-		String prefix = "1234";
+		String prefix = "12345";
 		String fullPrefix = "0123456789";
 		
 		char [] prefixArr = prefix.toCharArray();
@@ -163,7 +163,7 @@ public class HBaseSalterTester {
 	public void getAllKeysWithPrefixWithStop() throws IOException {
 		String keyStr = "1021";
 		byte [] keyBytes = Bytes.toBytes(keyStr);
-		String prefix = "01234";
+		String prefix = "012345";
 		String fullPrefix = "0123456789";
 		
 		char [] prefixArr = prefix.toCharArray();
@@ -276,7 +276,7 @@ public class HBaseSalterTester {
 		byte [] regionStart = Bytes.toBytes("1");
 		byte [] regionsStop = Bytes.toBytes("4");
 
-		String expectedPrefix = "123";
+		String expectedPrefix = "1234";
 		char [] prefixArr = expectedPrefix.toCharArray();
 		
 		byte [][] expectedStart = new byte[prefixArr.length][];
@@ -312,7 +312,7 @@ public class HBaseSalterTester {
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 		    "1020", "1021",
 		    "1_1021", "3_1023",
-		    "12", "012345"
+		    "123", "012345"
 				);
 
 		System.out.println("------------ TEST 21 --------------");
@@ -326,7 +326,7 @@ public class HBaseSalterTester {
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 			    "1020", "1021",
 			    Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023",
-			    "012", "012345"
+			    "0123", "012345"
 					);
 
 		System.out.println("------------ TEST 23 --------------");
@@ -340,42 +340,42 @@ public class HBaseSalterTester {
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 			Bytes.toString(HConstants.EMPTY_START_ROW), "1021",
 		    "1_1021", "3_1023",
-		    "12", "012345"
+		    "123", "012345"
 				);
 
 		System.out.println("------------ TEST 25 --------------");
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 			"1020", Bytes.toString(HConstants.EMPTY_END_ROW),
 		    "1_1021", "3_1023",
-		    "12", "012345"
+		    "123", "012345"
 				);
 
 		System.out.println("------------ TEST 26 --------------");
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 				Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW),
 		    "1_1021", "3_1023",
-		    "12", "012345"
+		    "123", "012345"
 				);
 
 		System.out.println("------------ TEST 27 --------------");
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 			Bytes.toString(HConstants.EMPTY_START_ROW), "1021",
 			Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023",
-		    "012", "012345"
+		    "0123", "012345"
 				);
 
 		System.out.println("------------ TEST 28 --------------");
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 			"1020", Bytes.toString(HConstants.EMPTY_END_ROW),
 			Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023",
-		    "012", "012345"
+		    "0123", "012345"
 				);
 
 		System.out.println("------------ TEST 29 --------------");
 		getDistributedIntervalsWithRegionsStartStopWithPrefix(
 			Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW),
 			Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023",
-		    "012", "012345"
+		    "0123", "012345"
 				);
 
 		System.out.println("------------ TEST 30 --------------");
-- 
cgit v1.2.3