package parallelai.spyglass.hbase; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.TreeSet; import java.util.UUID; import javax.naming.NamingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseInputFormat_SINGLE implements InputFormat, JobConfigurable { private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class); private final String id = UUID.randomUUID().toString(); private byte[][] inputColumns; private HTable table; // private HBaseRecordReader tableRecordReader; private Filter rowFilter; // private String tableName = ""; private HashMap reverseDNSCacheMap = new HashMap(); private String nameServer = null; // private Scan scan = null; @SuppressWarnings("deprecation") @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { if (this.table == null) { throw new IOException("No table was provided"); } if (this.inputColumns == null || this.inputColumns.length == 0) { throw new IOException("Expecting at least one column"); } Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { HRegionLocation regLoc = table.getRegionLocation( HConstants.EMPTY_BYTE_ARRAY, false); if (null == regLoc) { throw new IOException("Expecting at least one region."); } List splits = new ArrayList(1); HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split( Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); splits.add(split); return splits.toArray(new HBaseTableSplit[splits.size()]); } if (keys.getSecond() == null || keys.getSecond().length == 0) { throw new IOException("Expecting at least one region."); } if (keys.getFirst().length != keys.getSecond().length) { throw new IOException("Regions for start and end key do not match"); } byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; byte[] maxKey = keys.getSecond()[0]; LOG.debug(String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); byte[][] regStartKeys = keys.getFirst(); byte[][] regStopKeys = keys.getSecond(); String[] regions = new String[regStartKeys.length]; for (int i = 0; i < regStartKeys.length; i++) { minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] : minKey; maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] : maxKey; HServerAddress regionServerAddress = table.getRegionLocation( keys.getFirst()[i]).getServerAddress(); InetAddress regionAddress = regionServerAddress.getInetSocketAddress() .getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); regionLocation = regionServerAddress.getHostname(); } // HServerAddress regionServerAddress = // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); // InetAddress regionAddress = // regionServerAddress.getInetSocketAddress().getAddress(); // // String regionLocation; // // try { // regionLocation = reverseDNS(regionAddress); // } catch (NamingException e) { // LOG.error("Cannot resolve the host name for " + regionAddress + // " because of " + e); // regionLocation = regionServerAddress.getHostname(); // } // String regionLocation = // table.getRegionLocation(keys.getFirst()[i]).getHostname(); LOG.debug("***** " + regionLocation); if (regionLocation == null || regionLocation.length() == 0) throw new IOException("The region info for regiosn " + i + " is null or empty"); regions[i] = regionLocation; LOG.debug(String.format( "Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); } byte[] startRow = HConstants.EMPTY_START_ROW; byte[] stopRow = HConstants.EMPTY_END_ROW; LOG.debug(String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); LOG.debug("SOURCE MODE is : " + sourceMode); switch (sourceMode) { case SCAN_ALL: startRow = HConstants.EMPTY_START_ROW; stopRow = HConstants.EMPTY_END_ROW; LOG.info(String.format( "SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; case SCAN_RANGE: startRow = (startKey != null && startKey.length() != 0) ? Bytes .toBytes(startKey) : HConstants.EMPTY_START_ROW; stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes .toBytes(stopKey) : HConstants.EMPTY_END_ROW; LOG.info(String.format( "SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; } switch (sourceMode) { case EMPTY: case SCAN_ALL: case SCAN_RANGE: { // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : // startRow; // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : // stopRow; List splits = new ArrayList(); if (!useSalt) { List validRegions = table.getRegionsInRange( startRow, stopRow); int maxRegions = validRegions.size(); int currentRegion = 1; for (HRegionLocation cRegion : validRegions) { byte[] rStart = cRegion.getRegionInfo().getStartKey(); byte[] rStop = cRegion.getRegionInfo().getEndKey(); HServerAddress regionServerAddress = cRegion .getServerAddress(); InetAddress regionAddress = regionServerAddress .getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); regionLocation = regionServerAddress.getHostname(); } byte[] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart : startRow); byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); LOG.debug(String.format( "BOOL start (%s) stop (%s) length (%d)", (startRow == HConstants.EMPTY_START_ROW || (Bytes .compareTo(startRow, rStart) <= 0)), (stopRow == HConstants.EMPTY_END_ROW || (Bytes .compareTo(stopRow, rStop) >= 0)), rStop.length)); HBaseTableSplit split = new HBaseTableSplit( table.getTableName(), sStart, sStop, regionLocation, SourceMode.SCAN_RANGE, useSalt); split.setEndRowInclusive(currentRegion == maxRegions); currentRegion++; LOG.debug(String .format( "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", Bytes.toString(startRow), Bytes.toString(stopRow), Bytes.toString(rStart), Bytes.toString(rStop), Bytes.toString(sStart), Bytes.toString(sStop), cRegion.getHostnamePort(), split)); splits.add(split); } } else { LOG.debug("Using SALT : " + useSalt); // Will return the start and the stop key with all possible // prefixes. for (int i = 0; i < regions.length; i++) { Pair[] intervals = HBaseSalter .getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList); for (Pair pair : intervals) { LOG.info("".format( "Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); HBaseTableSplit split = new HBaseTableSplit( table.getTableName(), pair.getFirst(), pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, useSalt); split.setEndRowInclusive(true); splits.add(split); } } } LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); for (HBaseTableSplit s : splits) { LOG.info("RETURNED SPLITS: split -> " + s); } return splits.toArray(new HBaseTableSplit[splits.size()]); } case GET_LIST: { // if( keyList == null || keyList.size() == 0 ) { if (keyList == null) { throw new IOException( "Source Mode is GET_LIST but key list is EMPTY"); } if (useSalt) { TreeSet tempKeyList = new TreeSet(); for (String key : keyList) { tempKeyList.add(HBaseSalter.addSaltPrefix(key)); } keyList = tempKeyList; } LOG.debug("".format("Splitting Key List (%s)", keyList)); List splits = new ArrayList(); for (int i = 0; i < keys.getFirst().length; i++) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } LOG.debug(String.format( "Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i]))); Set regionsSubSet = null; if ((regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { LOG.debug("REGION start is empty"); LOG.debug("REGION stop is empty"); regionsSubSet = keyList; } else if (regStartKeys[i] == null || regStartKeys[i].length == 0) { LOG.debug("REGION start is empty"); regionsSubSet = keyList.headSet( Bytes.toString(regStopKeys[i]), true); } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { LOG.debug("REGION stop is empty"); regionsSubSet = keyList.tailSet( Bytes.toString(regStartKeys[i]), true); } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { regionsSubSet = keyList.subSet( Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); } else { throw new IOException(String.format( "For REGION (%s) Start Key (%s) > Stop Key(%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); } if (regionsSubSet == null || regionsSubSet.size() == 0) { LOG.debug("EMPTY: Key is for region " + regions[i] + " is null"); continue; } TreeSet regionKeyList = new TreeSet( regionsSubSet); LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList)); HBaseTableSplit split = new HBaseTableSplit( table.getTableName(), regionKeyList, versions, regions[i], SourceMode.GET_LIST, useSalt); splits.add(split); } LOG.debug("RETURNED SPLITS: split -> " + splits); return splits.toArray(new HBaseTableSplit[splits.size()]); } default: throw new IOException("Unknown source Mode : " + sourceMode); } } private String reverseDNS(InetAddress ipAddress) throws NamingException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( ipAddress, this.nameServer)); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; } @Override public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { if (!(split instanceof HBaseTableSplit)) throw new IOException("Table Split is not type HBaseTableSplit"); HBaseTableSplit tSplit = (HBaseTableSplit) split; HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE(); switch (tSplit.getSourceMode()) { case SCAN_ALL: case SCAN_RANGE: { LOG.debug(String.format( "For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow())); trr.setStartRow(tSplit.getStartRow()); trr.setEndRow(tSplit.getEndRow()); trr.setEndRowInclusive(tSplit.getEndRowInclusive()); trr.setUseSalt(useSalt); } break; case GET_LIST: { LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList())); trr.setKeyList(tSplit.getKeyList()); trr.setVersions(tSplit.getVersions()); trr.setUseSalt(useSalt); } break; default: throw new IOException("Unknown source mode : " + tSplit.getSourceMode()); } trr.setSourceMode(tSplit.getSourceMode()); trr.setHTable(this.table); trr.setInputColumns(this.inputColumns); trr.setRowFilter(this.rowFilter); trr.init(); return trr; } /* Configuration Section */ /** * space delimited list of columns */ public static final String COLUMN_LIST = "hbase.tablecolumns"; /** * Use this jobconf param to specify the input table */ private static final String INPUT_TABLE = "hbase.inputtable"; private String startKey = null; private String stopKey = null; private SourceMode sourceMode = SourceMode.EMPTY; private TreeSet keyList = null; private int versions = 1; private boolean useSalt = false; private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; public void configure(JobConf job) { String tableName = getTableName(job); String colArg = job.get(COLUMN_LIST); String[] colNames = colArg.split(" "); byte[][] m_cols = new byte[colNames.length][]; for (int i = 0; i < m_cols.length; i++) { m_cols[i] = Bytes.toBytes(colNames[i]); } setInputColumns(m_cols); try { setHTable(new HTable(HBaseConfiguration.create(job), tableName)); } catch (Exception e) { LOG.error("************* Table could not be created"); LOG.error(StringUtils.stringifyException(e)); } LOG.debug("Entered : " + this.getClass() + " : configure()"); useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job)), false); prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), HBaseSalter.DEFAULT_PREFIX_LIST); sourceMode = SourceMode.valueOf(job.get(String.format( HBaseConstants.SOURCE_MODE, getTableName(job)))); LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), sourceMode)); switch (sourceMode) { case SCAN_RANGE: LOG.info("HIT SCAN_RANGE"); startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job))); stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job))); LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey)); break; case GET_LIST: LOG.info("HIT GET_LIST"); Collection keys = job.getStringCollection(String.format( HBaseConstants.KEY_LIST, getTableName(job))); keyList = new TreeSet(keys); versions = job.getInt( String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); LOG.debug("GOT KEY LIST : " + keys); LOG.debug(String.format("SETTING key list (%s)", keyList)); break; case EMPTY: LOG.info("HIT EMPTY"); sourceMode = SourceMode.SCAN_ALL; break; default: LOG.info("HIT DEFAULT"); break; } } public void validateInput(JobConf job) throws IOException { // expecting exactly one path String tableName = getTableName(job); if (tableName == null) { throw new IOException("expecting one table name"); } LOG.debug(String.format("Found Table name [%s]", tableName)); // connected to table? if (getHTable() == null) { throw new IOException("could not connect to table '" + tableName + "'"); } LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); // expecting at least one column String colArg = job.get(COLUMN_LIST); if (colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } LOG.debug(String.format("Found Columns [%s]", colArg)); LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey)); if (sourceMode == SourceMode.EMPTY) { throw new IOException("SourceMode should not be EMPTY"); } if (sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0)) { throw new IOException("Source mode is GET_LIST bu key list is empty"); } } /* Getters & Setters */ private HTable getHTable() { return this.table; } private void setHTable(HTable ht) { this.table = ht; } private void setInputColumns(byte[][] ic) { this.inputColumns = ic; } private void setJobProp(JobConf job, String key, String value) { if (job.get(key) != null) throw new RuntimeException(String.format( "Job Conf already has key [%s] with value [%s]", key, job.get(key))); job.set(key, value); } private String getJobProp(JobConf job, String key) { return job.get(key); } public static void setTableName(JobConf job, String tableName) { // Make sure that table has not been set before String oldTableName = getTableName(job); if (oldTableName != null) { throw new RuntimeException("table name already set to: '" + oldTableName + "'"); } job.set(INPUT_TABLE, tableName); } public static String getTableName(JobConf job) { return job.get(INPUT_TABLE); } protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { return true; } }