diff options
Diffstat (limited to 'src/main/java')
3 files changed, 1128 insertions, 1 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index 8e121bc..5a1184f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -90,7 +90,7 @@ public class HBaseInputFormat implements  	@SuppressWarnings("deprecation")  	@Override -	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { +	public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException {  		if (this.table == null) {  			throw new IOException("No table was provided");  		} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java new file mode 100644 index 0000000..96bfea1 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java @@ -0,0 +1,622 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseInputFormat_SINGLE implements +		InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + +	private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class); + +	private final String id = UUID.randomUUID().toString(); + +	private byte[][] inputColumns; +	private HTable table; +	// private HBaseRecordReader tableRecordReader; +	private Filter rowFilter; +	// private String tableName = ""; + +	private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); + +	private String nameServer = null; + +	// private Scan scan = null; + +	@SuppressWarnings("deprecation") +	@Override +	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { +		if (this.table == null) { +			throw new IOException("No table was provided"); +		} + +		if (this.inputColumns == null || this.inputColumns.length == 0) { +			throw new IOException("Expecting at least one column"); +		} + +		Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + +		if (keys == null || keys.getFirst() == null +				|| keys.getFirst().length == 0) { +			HRegionLocation regLoc = table.getRegionLocation( +					HConstants.EMPTY_BYTE_ARRAY, false); + +			if (null == regLoc) { +				throw new IOException("Expecting at least one region."); +			} + +			List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); +			HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +					HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc +							.getHostnamePort().split( +									Addressing.HOSTNAME_PORT_SEPARATOR)[0], +					SourceMode.EMPTY, false); + +			splits.add(split); + +			return splits.toArray(new HBaseTableSplit[splits.size()]); +		} + +		if (keys.getSecond() == null || keys.getSecond().length == 0) { +			throw new IOException("Expecting at least one region."); +		} + +		if (keys.getFirst().length != keys.getSecond().length) { +			throw new IOException("Regions for start and end key do not match"); +		} + +		byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; +		byte[] maxKey = keys.getSecond()[0]; + +		LOG.debug(String.format("SETTING min key (%s) and max key (%s)", +				Bytes.toString(minKey), Bytes.toString(maxKey))); + +		byte[][] regStartKeys = keys.getFirst(); +		byte[][] regStopKeys = keys.getSecond(); +		String[] regions = new String[regStartKeys.length]; + +		for (int i = 0; i < regStartKeys.length; i++) { +			minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) +					&& (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] +					: minKey; +			maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) +					&& (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] +					: maxKey; + +			HServerAddress regionServerAddress = table.getRegionLocation( +					keys.getFirst()[i]).getServerAddress(); +			InetAddress regionAddress = regionServerAddress.getInetSocketAddress() +					.getAddress(); +			String regionLocation; +			try { +				regionLocation = reverseDNS(regionAddress); +			} catch (NamingException e) { +				LOG.error("Cannot resolve the host name for " + regionAddress +						+ " because of " + e); +				regionLocation = regionServerAddress.getHostname(); +			} + +			// HServerAddress regionServerAddress = +			// table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +			// InetAddress regionAddress = +			// regionServerAddress.getInetSocketAddress().getAddress(); +			// +			// String regionLocation; +			// +			// try { +			// regionLocation = reverseDNS(regionAddress); +			// } catch (NamingException e) { +			// LOG.error("Cannot resolve the host name for " + regionAddress + +			// " because of " + e); +			// regionLocation = regionServerAddress.getHostname(); +			// } + +			// String regionLocation = +			// table.getRegionLocation(keys.getFirst()[i]).getHostname(); + +			LOG.debug("***** " + regionLocation); + +			if (regionLocation == null || regionLocation.length() == 0) +				throw new IOException("The region info for regiosn " + i +						+ " is null or empty"); + +			regions[i] = regionLocation; + +			LOG.debug(String.format( +					"Region (%s) has start key (%s) and stop key (%s)", regions[i], +					Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); +		} + +		byte[] startRow = HConstants.EMPTY_START_ROW; +		byte[] stopRow = HConstants.EMPTY_END_ROW; + +		LOG.debug(String.format("Found min key (%s) and max key (%s)", +				Bytes.toString(minKey), Bytes.toString(maxKey))); + +		LOG.debug("SOURCE MODE is : " + sourceMode); + +		switch (sourceMode) { +			case SCAN_ALL: +				startRow = HConstants.EMPTY_START_ROW; +				stopRow = HConstants.EMPTY_END_ROW; + +				LOG.info(String.format( +						"SCAN ALL: Found start key (%s) and stop key (%s)", +						Bytes.toString(startRow), Bytes.toString(stopRow))); +			break; + +			case SCAN_RANGE: +				startRow = (startKey != null && startKey.length() != 0) ? Bytes +						.toBytes(startKey) : HConstants.EMPTY_START_ROW; +				stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes +						.toBytes(stopKey) : HConstants.EMPTY_END_ROW; + +				LOG.info(String.format( +						"SCAN RANGE: Found start key (%s) and stop key (%s)", +						Bytes.toString(startRow), Bytes.toString(stopRow))); +			break; +		} + +		switch (sourceMode) { +			case EMPTY: +			case SCAN_ALL: +			case SCAN_RANGE: { +				// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : +				// startRow; +				// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : +				// stopRow; + +				List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + +				if (!useSalt) { + +					List<HRegionLocation> validRegions = table.getRegionsInRange( +							startRow, stopRow); + +					int maxRegions = validRegions.size(); +					int currentRegion = 1; + +					for (HRegionLocation cRegion : validRegions) { +						byte[] rStart = cRegion.getRegionInfo().getStartKey(); +						byte[] rStop = cRegion.getRegionInfo().getEndKey(); + +						HServerAddress regionServerAddress = cRegion +								.getServerAddress(); +						InetAddress regionAddress = regionServerAddress +								.getInetSocketAddress().getAddress(); +						String regionLocation; +						try { +							regionLocation = reverseDNS(regionAddress); +						} catch (NamingException e) { +							LOG.error("Cannot resolve the host name for " +									+ regionAddress + " because of " + e); +							regionLocation = regionServerAddress.getHostname(); +						} + +						byte[] sStart = (startRow == HConstants.EMPTY_START_ROW +								|| (Bytes.compareTo(startRow, rStart) <= 0) ? rStart +								: startRow); +						byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW +								|| (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop +								: stopRow); + +						LOG.debug(String.format( +								"BOOL start (%s) stop (%s) length (%d)", +								(startRow == HConstants.EMPTY_START_ROW || (Bytes +										.compareTo(startRow, rStart) <= 0)), +								(stopRow == HConstants.EMPTY_END_ROW || (Bytes +										.compareTo(stopRow, rStop) >= 0)), rStop.length)); + +						HBaseTableSplit split = new HBaseTableSplit( +								table.getTableName(), sStart, sStop, regionLocation, +								SourceMode.SCAN_RANGE, useSalt); + +						split.setEndRowInclusive(currentRegion == maxRegions); + +						currentRegion++; + +						LOG.debug(String +								.format( +										"START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", +										Bytes.toString(startRow), +										Bytes.toString(stopRow), Bytes.toString(rStart), +										Bytes.toString(rStop), Bytes.toString(sStart), +										Bytes.toString(sStop), cRegion.getHostnamePort(), +										split)); + +						splits.add(split); +					} +				} else { +					LOG.debug("Using SALT : " + useSalt); + +					// Will return the start and the stop key with all possible +					// prefixes. +					for (int i = 0; i < regions.length; i++) { +						Pair<byte[], byte[]>[] intervals = HBaseSalter +								.getDistributedIntervals(startRow, stopRow, +										regStartKeys[i], regStopKeys[i], prefixList); + +						for (Pair<byte[], byte[]> pair : intervals) { +							LOG.info("".format( +									"Using SALT, Region (%s) Start (%s) Stop (%s)", +									regions[i], Bytes.toString(pair.getFirst()), +									Bytes.toString(pair.getSecond()))); + +							HBaseTableSplit split = new HBaseTableSplit( +									table.getTableName(), pair.getFirst(), +									pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, +									useSalt); + +							split.setEndRowInclusive(true); +							splits.add(split); +						} +					} +				} + +				LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); +				for (HBaseTableSplit s : splits) { +					LOG.info("RETURNED SPLITS: split -> " + s); +				} + +				return splits.toArray(new HBaseTableSplit[splits.size()]); +			} + +			case GET_LIST: { +				// if( keyList == null || keyList.size() == 0 ) { +				if (keyList == null) { +					throw new IOException( +							"Source Mode is GET_LIST but key list is EMPTY"); +				} + +				if (useSalt) { +					TreeSet<String> tempKeyList = new TreeSet<String>(); + +					for (String key : keyList) { +						tempKeyList.add(HBaseSalter.addSaltPrefix(key)); +					} + +					keyList = tempKeyList; +				} + +				LOG.debug("".format("Splitting Key List (%s)", keyList)); + +				List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + +				for (int i = 0; i < keys.getFirst().length; i++) { + +					if (!includeRegionInSplit(keys.getFirst()[i], +							keys.getSecond()[i])) { +						continue; +					} + +					LOG.debug(String.format( +							"Getting region (%s) subset (%s) to (%s)", regions[i], +							Bytes.toString(regStartKeys[i]), +							Bytes.toString(regStartKeys[i]))); + +					Set<String> regionsSubSet = null; + +					if ((regStartKeys[i] == null || regStartKeys[i].length == 0) +							&& (regStopKeys[i] == null || regStopKeys[i].length == 0)) { +						LOG.debug("REGION start is empty"); +						LOG.debug("REGION stop is empty"); +						regionsSubSet = keyList; +					} else if (regStartKeys[i] == null +							|| regStartKeys[i].length == 0) { +						LOG.debug("REGION start is empty"); +						regionsSubSet = keyList.headSet( +								Bytes.toString(regStopKeys[i]), true); +					} else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { +						LOG.debug("REGION stop is empty"); +						regionsSubSet = keyList.tailSet( +								Bytes.toString(regStartKeys[i]), true); +					} else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { +						regionsSubSet = keyList.subSet( +								Bytes.toString(regStartKeys[i]), true, +								Bytes.toString(regStopKeys[i]), true); +					} else { +						throw new IOException(String.format( +								"For REGION (%s) Start Key (%s) > Stop Key(%s)", +								regions[i], Bytes.toString(regStartKeys[i]), +								Bytes.toString(regStopKeys[i]))); +					} + +					if (regionsSubSet == null || regionsSubSet.size() == 0) { +						LOG.debug("EMPTY: Key is for region " + regions[i] +								+ " is null"); + +						continue; +					} + +					TreeSet<String> regionKeyList = new TreeSet<String>( +							regionsSubSet); + +					LOG.debug(String.format("Regions [%s] has key list <%s>", +							regions[i], regionKeyList)); + +					HBaseTableSplit split = new HBaseTableSplit( +							table.getTableName(), regionKeyList, versions, regions[i], +							SourceMode.GET_LIST, useSalt); +					splits.add(split); +				} + +				LOG.debug("RETURNED SPLITS: split -> " + splits); + +				return splits.toArray(new HBaseTableSplit[splits.size()]); +			} + +			default: +				throw new IOException("Unknown source Mode : " + sourceMode); +		} +	} + +	private String reverseDNS(InetAddress ipAddress) throws NamingException { +		String hostName = this.reverseDNSCacheMap.get(ipAddress); +		if (hostName == null) { +			hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( +					ipAddress, this.nameServer)); +			this.reverseDNSCacheMap.put(ipAddress, hostName); +		} +		return hostName; +	} + +	@Override +	public RecordReader<ImmutableBytesWritable, Result> getRecordReader( +			InputSplit split, JobConf job, Reporter reporter) throws IOException { + +		if (!(split instanceof HBaseTableSplit)) +			throw new IOException("Table Split is not type HBaseTableSplit"); + +		HBaseTableSplit tSplit = (HBaseTableSplit) split; + +		HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE(); + +		switch (tSplit.getSourceMode()) { +			case SCAN_ALL: +			case SCAN_RANGE: { +				LOG.debug(String.format( +						"For split [%s] we have start key (%s) and stop key (%s)", +						tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + +				trr.setStartRow(tSplit.getStartRow()); +				trr.setEndRow(tSplit.getEndRow()); +				trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +				trr.setUseSalt(useSalt); +			} + +			break; + +			case GET_LIST: { +				LOG.debug(String.format("For split [%s] we have key list (%s)", +						tSplit, tSplit.getKeyList())); + +				trr.setKeyList(tSplit.getKeyList()); +				trr.setVersions(tSplit.getVersions()); +				trr.setUseSalt(useSalt); +			} + +			break; + +			default: +				throw new IOException("Unknown source mode : " +						+ tSplit.getSourceMode()); +		} + +		trr.setSourceMode(tSplit.getSourceMode()); +		trr.setHTable(this.table); +		trr.setInputColumns(this.inputColumns); +		trr.setRowFilter(this.rowFilter); + +		trr.init(); + +		return trr; +	} + +	/* Configuration Section */ + +	/** +	 * space delimited list of columns +	 */ +	public static final String COLUMN_LIST = "hbase.tablecolumns"; + +	/** +	 * Use this jobconf param to specify the input table +	 */ +	private static final String INPUT_TABLE = "hbase.inputtable"; + +	private String startKey = null; +	private String stopKey = null; + +	private SourceMode sourceMode = SourceMode.EMPTY; +	private TreeSet<String> keyList = null; +	private int versions = 1; +	private boolean useSalt = false; +	private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + +	public void configure(JobConf job) { +		String tableName = getTableName(job); +		String colArg = job.get(COLUMN_LIST); +		String[] colNames = colArg.split(" "); +		byte[][] m_cols = new byte[colNames.length][]; +		for (int i = 0; i < m_cols.length; i++) { +			m_cols[i] = Bytes.toBytes(colNames[i]); +		} +		setInputColumns(m_cols); + +		try { +			setHTable(new HTable(HBaseConfiguration.create(job), tableName)); +		} catch (Exception e) { +			LOG.error("************* Table could not be created"); +			LOG.error(StringUtils.stringifyException(e)); +		} + +		LOG.debug("Entered : " + this.getClass() + " : configure()"); + +		useSalt = job.getBoolean( +				String.format(HBaseConstants.USE_SALT, getTableName(job)), false); +		prefixList = job.get( +				String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), +				HBaseSalter.DEFAULT_PREFIX_LIST); + +		sourceMode = SourceMode.valueOf(job.get(String.format( +				HBaseConstants.SOURCE_MODE, getTableName(job)))); + +		LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String +				.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job +				.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), +				sourceMode)); + +		switch (sourceMode) { +			case SCAN_RANGE: +				LOG.info("HIT SCAN_RANGE"); + +				startKey = getJobProp(job, +						String.format(HBaseConstants.START_KEY, getTableName(job))); +				stopKey = getJobProp(job, +						String.format(HBaseConstants.STOP_KEY, getTableName(job))); + +				LOG.info(String.format("Setting start key (%s) and stop key (%s)", +						startKey, stopKey)); +			break; + +			case GET_LIST: +				LOG.info("HIT GET_LIST"); + +				Collection<String> keys = job.getStringCollection(String.format( +						HBaseConstants.KEY_LIST, getTableName(job))); +				keyList = new TreeSet<String>(keys); + +				versions = job.getInt( +						String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + +				LOG.debug("GOT KEY LIST : " + keys); +				LOG.debug(String.format("SETTING key list (%s)", keyList)); + +			break; + +			case EMPTY: +				LOG.info("HIT EMPTY"); + +				sourceMode = SourceMode.SCAN_ALL; +			break; + +			default: +				LOG.info("HIT DEFAULT"); + +			break; +		} +	} + +	public void validateInput(JobConf job) throws IOException { +		// expecting exactly one path +		String tableName = getTableName(job); + +		if (tableName == null) { +			throw new IOException("expecting one table name"); +		} +		LOG.debug(String.format("Found Table name [%s]", tableName)); + +		// connected to table? +		if (getHTable() == null) { +			throw new IOException("could not connect to table '" + tableName + "'"); +		} +		LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); + +		// expecting at least one column +		String colArg = job.get(COLUMN_LIST); +		if (colArg == null || colArg.length() == 0) { +			throw new IOException("expecting at least one column"); +		} +		LOG.debug(String.format("Found Columns [%s]", colArg)); + +		LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, +				stopKey)); + +		if (sourceMode == SourceMode.EMPTY) { +			throw new IOException("SourceMode should not be EMPTY"); +		} + +		if (sourceMode == SourceMode.GET_LIST +				&& (keyList == null || keyList.size() == 0)) { +			throw new IOException("Source mode is GET_LIST bu key list is empty"); +		} +	} + +	/* Getters & Setters */ +	private HTable getHTable() { +		return this.table; +	} + +	private void setHTable(HTable ht) { +		this.table = ht; +	} + +	private void setInputColumns(byte[][] ic) { +		this.inputColumns = ic; +	} + +	private void setJobProp(JobConf job, String key, String value) { +		if (job.get(key) != null) +			throw new RuntimeException(String.format( +					"Job Conf already has key [%s] with value [%s]", key, +					job.get(key))); +		job.set(key, value); +	} + +	private String getJobProp(JobConf job, String key) { +		return job.get(key); +	} + +	public static void setTableName(JobConf job, String tableName) { +		// Make sure that table has not been set before +		String oldTableName = getTableName(job); +		if (oldTableName != null) { +			throw new RuntimeException("table name already set to: '" +					+ oldTableName + "'"); +		} + +		job.set(INPUT_TABLE, tableName); +	} + +	public static String getTableName(JobConf job) { +		return job.get(INPUT_TABLE); +	} + +	protected boolean includeRegionInSplit(final byte[] startKey, +			final byte[] endKey) { +		return true; +	} +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java new file mode 100644 index 0000000..5eafc78 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java @@ -0,0 +1,505 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes;  +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReader_SINGLE implements +    RecordReader<ImmutableBytesWritable, Result> { + +  static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class); + +  private byte[] startRow; +  private byte[] endRow; +  private byte[] lastSuccessfulRow; +  private TreeSet<String> keyList; +  private SourceMode sourceMode; +  private Filter trrRowFilter; +  private ResultScanner scanner; +  private HTable htable; +  private byte[][] trrInputColumns; +  private long timestamp; +  private int rowcount; +  private boolean logScannerActivity = false; +  private int logPerRowCount = 100; +  private boolean endRowInclusive = true; +  private int versions = 1; +  private boolean useSalt = false; + +  /** +   * Restart from survivable exceptions by creating a new scanner. +   *  +   * @param firstRow +   * @throws IOException +   */ +  public void restartRangeScan(byte[] firstRow) throws IOException { +    Scan currentScan; +    if ((endRow != null) && (endRow.length > 0)) { +      if (trrRowFilter != null) { +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +            new byte[] { 0 }) : endRow)); + +        TableInputFormat.addColumns(scan, trrInputColumns); +        scan.setFilter(trrRowFilter); +        scan.setCacheBlocks(false); +        this.scanner = this.htable.getScanner(scan); +        currentScan = scan; +      } else { +        LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) +            + ", endRow: " + Bytes.toString(endRow)); +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +            new byte[] { 0 }) : endRow)); +        TableInputFormat.addColumns(scan, trrInputColumns); +        this.scanner = this.htable.getScanner(scan); +        currentScan = scan; +      } +    } else { +      LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) +          + ", no endRow"); + +      Scan scan = new Scan(firstRow); +      TableInputFormat.addColumns(scan, trrInputColumns); +      scan.setFilter(trrRowFilter); +      this.scanner = this.htable.getScanner(scan); +      currentScan = scan; +    } +    if (logScannerActivity) { +      LOG.debug("Current scan=" + currentScan.toString()); +      timestamp = System.currentTimeMillis(); +      rowcount = 0; +    } +  } + +  public TreeSet<String> getKeyList() { +    return keyList; +  } + +  public void setKeyList(TreeSet<String> keyList) { +    this.keyList = keyList; +  } + +  public void setVersions(int versions) { +    this.versions = versions; +  } +   +  public void setUseSalt(boolean useSalt) { +    this.useSalt = useSalt; +  } + +  public SourceMode getSourceMode() { +    return sourceMode; +  } + +  public void setSourceMode(SourceMode sourceMode) { +    this.sourceMode = sourceMode; +  } + +  public byte[] getEndRow() { +    return endRow; +  } + +  public void setEndRowInclusive(boolean isInclusive) { +    endRowInclusive = isInclusive; +  } + +  public boolean getEndRowInclusive() { +    return endRowInclusive; +  } + +  private byte[] nextKey = null; +  private Vector<List<KeyValue>> resultVector = null; +  Map<Long, List<KeyValue>> keyValueMap = null; + +  /** +   * Build the scanner. Not done in constructor to allow for extension. +   *  +   * @throws IOException +   */ +  public void init() throws IOException { +    switch (sourceMode) { +    case SCAN_ALL: +    case SCAN_RANGE: +      restartRangeScan(startRow); +      break; + +    case GET_LIST: +      nextKey = Bytes.toBytes(keyList.pollFirst()); +      break; + +    default: +      throw new IOException(" Unknown source mode : " + sourceMode); +    } +  } + +  byte[] getStartRow() { +    return this.startRow; +  } + +  /** +   * @param htable +   *          the {@link HTable} to scan. +   */ +  public void setHTable(HTable htable) { +    Configuration conf = htable.getConfiguration(); +    logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, +        false); +    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); +    this.htable = htable; +  } + +  /** +   * @param inputColumns +   *          the columns to be placed in {@link Result}. +   */ +  public void setInputColumns(final byte[][] inputColumns) { +    this.trrInputColumns = inputColumns; +  } + +  /** +   * @param startRow +   *          the first row in the split +   */ +  public void setStartRow(final byte[] startRow) { +    this.startRow = startRow; +  } + +  /** +   *  +   * @param endRow +   *          the last row in the split +   */ +  public void setEndRow(final byte[] endRow) { +    this.endRow = endRow; +  } + +  /** +   * @param rowFilter +   *          the {@link Filter} to be used. +   */ +  public void setRowFilter(Filter rowFilter) { +    this.trrRowFilter = rowFilter; +  } + +  @Override +  public void close() { +    if (this.scanner != null) +      this.scanner.close(); +  } + +  /** +   * @return ImmutableBytesWritable +   *  +   * @see org.apache.hadoop.mapred.RecordReader#createKey() +   */ +  @Override +  public ImmutableBytesWritable createKey() { +    return new ImmutableBytesWritable(); +  } + +  /** +   * @return RowResult +   *  +   * @see org.apache.hadoop.mapred.RecordReader#createValue() +   */ +  @Override +  public Result createValue() { +    return new Result(); +  } + +  @Override +  public long getPos() { +    // This should be the ordinal tuple in the range; +    // not clear how to calculate... +    return 0; +  } + +  @Override +  public float getProgress() { +    // Depends on the total number of tuples and getPos +    return 0; +  } + +  /** +   * @param key +   *          HStoreKey as input key. +   * @param value +   *          MapWritable as input value +   * @return true if there was more data +   * @throws IOException +   */ +  @Override +  public boolean next(ImmutableBytesWritable key, Result value) +      throws IOException { + +    switch (sourceMode) { +    case SCAN_ALL: +    case SCAN_RANGE: { + +      Result result; +      try { +        try { +          result = this.scanner.next(); +          if (logScannerActivity) { +            rowcount++; +            if (rowcount >= logPerRowCount) { +              long now = System.currentTimeMillis(); +              LOG.debug("Mapper took " + (now - timestamp) + "ms to process " +                  + rowcount + " rows"); +              timestamp = now; +              rowcount = 0; +            } +          } +        } catch (IOException e) { +          // try to handle all IOExceptions by restarting +          // the scanner, if the second call fails, it will be rethrown +          LOG.debug("recovered from " + StringUtils.stringifyException(e)); +          if (lastSuccessfulRow == null) { +            LOG.warn("We are restarting the first next() invocation," +                + " if your mapper has restarted a few other times like this" +                + " then you should consider killing this job and investigate" +                + " why it's taking so long."); +          } +          if (lastSuccessfulRow == null) { +            restartRangeScan(startRow); +          } else { +            restartRangeScan(lastSuccessfulRow); +            this.scanner.next(); // skip presumed already mapped row +          } +          result = this.scanner.next(); +        } + +        if (result != null && result.size() > 0) { +          if( useSalt) { +            key.set( HBaseSalter.delSaltPrefix(result.getRow())); +          } else { +            key.set(result.getRow()); +          } +           +          lastSuccessfulRow = key.get(); +          Writables.copyWritable(result, value); +          return true; +        } +        return false; +      } catch (IOException ioe) { +        if (logScannerActivity) { +          long now = System.currentTimeMillis(); +          LOG.debug("Mapper took " + (now - timestamp) + "ms to process " +              + rowcount + " rows"); +          LOG.debug(ioe); +          String lastRow = lastSuccessfulRow == null ? "null" : Bytes +              .toStringBinary(lastSuccessfulRow); +          LOG.debug("lastSuccessfulRow=" + lastRow); +        } +        throw ioe; +      } +    } + +    case GET_LIST: { +      LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); +       +      if (versions == 1) { +        if (nextKey != null) { +          LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); +           +          Get theGet = new Get(nextKey); +          theGet.setMaxVersions(versions); + +          Result result = this.htable.get(theGet); + +          if (result != null && (! result.isEmpty()) ) { +            LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + +            if (keyList != null || !keyList.isEmpty()) { +              String newKey = keyList.pollFirst(); +              LOG.debug("New Key => " + newKey); +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +            } else { +              nextKey = null; +            } +             +            LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); +             +            // Write the result +            if( useSalt) { +              key.set( HBaseSalter.delSaltPrefix(result.getRow())); +            } else { +              key.set(result.getRow()); +            } +            lastSuccessfulRow = key.get(); +            Writables.copyWritable(result, value); +             +            return true; +          } else { +            LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 +             +            String newKey; +            while((newKey = keyList.pollFirst()) != null) { +              LOG.debug("WHILE NEXT Key => " + newKey); + +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +               +              if( nextKey == null ) { +                LOG.error("BOMB! BOMB! BOMB!"); +                continue;  +              } +               +              if( ! this.htable.exists( new Get(nextKey) ) ) { +                LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); +                continue;  +              } else { break; } +            } +             +            nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                .toBytes(newKey); +             +            LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + +            return next(key, value); +          } +        } else { +          // Nothig left. return false +          return false; +        } +      } else { +        if (resultVector != null && resultVector.size() != 0) {  +          LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + +          List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); +          Result result = new Result(resultKeyValue); + +          LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + +          if( useSalt) { +            key.set( HBaseSalter.delSaltPrefix(result.getRow())); +          } else { +            key.set(result.getRow()); +          } +          lastSuccessfulRow = key.get(); +          Writables.copyWritable(result, value); + +          return true; +        } else { +          if (nextKey != null) { +            LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); +             +            Get theGet = new Get(nextKey); +            theGet.setMaxVersions(versions); + +            Result resultAll = this.htable.get(theGet); +             +            if( resultAll != null && (! resultAll.isEmpty())) { +              List<KeyValue> keyValeList = resultAll.list(); + +              keyValueMap = new HashMap<Long, List<KeyValue>>(); +               +              LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + +              for (KeyValue keyValue : keyValeList) { +                long version = keyValue.getTimestamp(); + +                if (keyValueMap.containsKey(new Long(version))) { +                  List<KeyValue> keyValueTempList = keyValueMap.get(new Long( +                      version)); +                  if (keyValueTempList == null) { +                    keyValueTempList = new ArrayList<KeyValue>(); +                  } +                  keyValueTempList.add(keyValue); +                } else { +                  List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); +                  keyValueMap.put(new Long(version), keyValueTempList); +                  keyValueTempList.add(keyValue); +                } +              } + +              resultVector = new Vector<List<KeyValue>>(); +              resultVector.addAll(keyValueMap.values()); + +              List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + +              Result result = new Result(resultKeyValue); + +              LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + +              String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + +              System.out.println("+ New Key => " + newKey); +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); + +              if( useSalt) { +                key.set( HBaseSalter.delSaltPrefix(result.getRow())); +              } else { +                key.set(result.getRow()); +              } +              lastSuccessfulRow = key.get(); +              Writables.copyWritable(result, value); +              return true; +            } else { +              LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 +               +              String newKey; +               +              while( (newKey = keyList.pollFirst()) != null ) { +                LOG.debug("+ WHILE NEXT Key => " + newKey); + +                nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                    .toBytes(newKey); +                 +                if( nextKey == null ) { +                  LOG.error("+ BOMB! BOMB! BOMB!"); +                  continue;  +                } +                 +                if( ! this.htable.exists( new Get(nextKey) ) ) { +                  LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); +                  continue;  +                } else { break; } +              } +               +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +               +              LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + +              return next(key, value); +            } +             +          } else { +            return false; +          } +        } +      } +    } +    default: +      throw new IOException("Unknown source mode : " + sourceMode); +    } +  } +} | 
