diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java | 645 |
1 files changed, 0 insertions, 645 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java deleted file mode 100644 index 5a1184f..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ /dev/null @@ -1,645 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import javax.naming.NamingException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseInputFormat implements - InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { - - private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); - - private final String id = UUID.randomUUID().toString(); - - private byte[][] inputColumns; - private HTable table; - // private HBaseRecordReader tableRecordReader; - private Filter rowFilter; - // private String tableName = ""; - - private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); - - private String nameServer = null; - - // private Scan scan = null; - - private HBaseMultiInputSplit[] convertToMultiSplitArray( - List<HBaseTableSplit> splits) throws IOException { - - if (splits == null) - throw new IOException("The list of splits is null => " + splits); - - HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>(); - - for (HBaseTableSplit hbt : splits) { - HBaseMultiInputSplit mis = null; - if (regionSplits.containsKey(hbt.getRegionLocation())) { - mis = regionSplits.get(hbt.getRegionLocation()); - } else { - regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit( - hbt.getRegionLocation())); - mis = regionSplits.get(hbt.getRegionLocation()); - } - - mis.addSplit(hbt); - regionSplits.put(hbt.getRegionLocation(), mis); - } - - Collection<HBaseMultiInputSplit> outVals = regionSplits.values(); - - LOG.debug("".format("Returning array of splits : %s", outVals)); - - if (outVals == null) - throw new IOException("The list of multi input splits were null"); - - return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]); - } - - @SuppressWarnings("deprecation") - @Override - public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (this.table == null) { - throw new IOException("No table was provided"); - } - - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); - } - - final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); - - if (keys == null || keys.getFirst() == null - || keys.getFirst().length == 0) { - HRegionLocation regLoc = table.getRegionLocation( - HConstants.EMPTY_BYTE_ARRAY, false); - - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - - final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split( - Addressing.HOSTNAME_PORT_SEPARATOR)[0], - SourceMode.EMPTY, false); - - splits.add(split); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - if (keys.getSecond() == null || keys.getSecond().length == 0) { - throw new IOException("Expecting at least one region."); - } - - if (keys.getFirst().length != keys.getSecond().length) { - throw new IOException("Regions for start and end key do not match"); - } - - byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; - byte[] maxKey = keys.getSecond()[0]; - - LOG.debug(String.format("SETTING min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - byte[][] regStartKeys = keys.getFirst(); - byte[][] regStopKeys = keys.getSecond(); - String[] regions = new String[regStartKeys.length]; - - for (int i = 0; i < regStartKeys.length; i++) { - minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) - && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] - : minKey; - maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) - && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] - : maxKey; - - HServerAddress regionServerAddress = table.getRegionLocation( - keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = regionServerAddress.getInetSocketAddress() - .getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress - + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - // HServerAddress regionServerAddress = - // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - // InetAddress regionAddress = - // regionServerAddress.getInetSocketAddress().getAddress(); - // - // String regionLocation; - // - // try { - // regionLocation = reverseDNS(regionAddress); - // } catch (NamingException e) { - // LOG.error("Cannot resolve the host name for " + regionAddress + - // " because of " + e); - // regionLocation = regionServerAddress.getHostname(); - // } - - // String regionLocation = - // table.getRegionLocation(keys.getFirst()[i]).getHostname(); - - LOG.debug("***** " + regionLocation); - - if (regionLocation == null || regionLocation.length() == 0) - throw new IOException("The region info for regiosn " + i - + " is null or empty"); - - regions[i] = regionLocation; - - LOG.debug(String.format( - "Region (%s) has start key (%s) and stop key (%s)", regions[i], - Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); - } - - byte[] startRow = HConstants.EMPTY_START_ROW; - byte[] stopRow = HConstants.EMPTY_END_ROW; - - LOG.debug(String.format("Found min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - LOG.debug("SOURCE MODE is : " + sourceMode); - - switch (sourceMode) { - case SCAN_ALL: - startRow = HConstants.EMPTY_START_ROW; - stopRow = HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN ALL: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - - case SCAN_RANGE: - startRow = (startKey != null && startKey.length() != 0) ? Bytes - .toBytes(startKey) : HConstants.EMPTY_START_ROW; - stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes - .toBytes(stopKey) : HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN RANGE: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - } - - switch (sourceMode) { - case EMPTY: - case SCAN_ALL: - case SCAN_RANGE: { - // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : - // startRow; - // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : - // stopRow; - - final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - if (!useSalt) { - - List<HRegionLocation> validRegions = table.getRegionsInRange( - startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for (HRegionLocation cRegion : validRegions) { - byte[] rStart = cRegion.getRegionInfo().getStartKey(); - byte[] rStop = cRegion.getRegionInfo().getEndKey(); - - HServerAddress regionServerAddress = cRegion - .getServerAddress(); - InetAddress regionAddress = regionServerAddress - .getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " - + regionAddress + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte[] sStart = (startRow == HConstants.EMPTY_START_ROW - || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart - : startRow); - byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW - || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop - : stopRow); - - LOG.debug(String.format( - "BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes - .compareTo(startRow, rStart) <= 0)), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes - .compareTo(stopRow, rStop) >= 0)), rStop.length)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), sStart, sStop, regionLocation, - SourceMode.SCAN_RANGE, useSalt); - - split.setEndRowInclusive(currentRegion == maxRegions); - - currentRegion++; - - LOG.debug(String - .format( - "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), - Bytes.toString(stopRow), Bytes.toString(rStart), - Bytes.toString(rStop), Bytes.toString(sStart), - Bytes.toString(sStop), cRegion.getHostnamePort(), - split)); - - splits.add(split); - } - } else { - LOG.debug("Using SALT : " + useSalt); - - // Will return the start and the stop key with all possible - // prefixes. - for (int i = 0; i < regions.length; i++) { - Pair<byte[], byte[]>[] intervals = HBaseSalter - .getDistributedIntervals(startRow, stopRow, - regStartKeys[i], regStopKeys[i], prefixList); - - for (Pair<byte[], byte[]> pair : intervals) { - LOG.debug("".format( - "Using SALT, Region (%s) Start (%s) Stop (%s)", - regions[i], Bytes.toString(pair.getFirst()), - Bytes.toString(pair.getSecond()))); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), pair.getFirst(), - pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, - useSalt); - - split.setEndRowInclusive(true); - splits.add(split); - } - } - } - - LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - case GET_LIST: { - // if( keyList == null || keyList.size() == 0 ) { - if (keyList == null) { - throw new IOException( - "Source Mode is GET_LIST but key list is EMPTY"); - } - - if (useSalt) { - TreeSet<String> tempKeyList = new TreeSet<String>(); - - for (String key : keyList) { - tempKeyList.add(HBaseSalter.addSaltPrefix(key)); - } - - keyList = tempKeyList; - } - - LOG.info("".format("Splitting Key List (%s)", keyList)); - - final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - for (int i = 0; i < keys.getFirst().length; i++) { - - if (!includeRegionInSplit(keys.getFirst()[i], - keys.getSecond()[i])) { - continue; - } - - LOG.debug(String.format( - "Getting region (%s) subset (%s) to (%s)", regions[i], - Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - - Set<String> regionsSubSet = null; - - if ((regStartKeys[i] == null || regStartKeys[i].length == 0) - && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { - LOG.debug("REGION start is empty"); - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList; - } else if (regStartKeys[i] == null - || regStartKeys[i].length == 0) { - LOG.debug("REGION start is empty"); - regionsSubSet = keyList.headSet( - Bytes.toString(regStopKeys[i]), true); - } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList.tailSet( - Bytes.toString(regStartKeys[i]), true); - } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { - regionsSubSet = keyList.subSet( - Bytes.toString(regStartKeys[i]), true, - Bytes.toString(regStopKeys[i]), true); - } else { - throw new IOException(String.format( - "For REGION (%s) Start Key (%s) > Stop Key(%s)", - regions[i], Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - } - - if (regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.debug("EMPTY: Key is for region " + regions[i] - + " is null"); - - continue; - } - - TreeSet<String> regionKeyList = new TreeSet<String>( - regionsSubSet); - - LOG.debug(String.format("Regions [%s] has key list <%s>", - regions[i], regionKeyList)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, versions, regions[i], - SourceMode.GET_LIST, useSalt); - splits.add(split); - } - - // if (splits.isEmpty()) { - // LOG.info("GOT EMPTY SPLITS"); - - // throw new IOException( - // "".format("Key List NOT found in any region")); - - // HRegionLocation regLoc = table.getRegionLocation( - // HConstants.EMPTY_BYTE_ARRAY, false); - // - // if (null == regLoc) { - // throw new IOException("Expecting at least one region."); - // } - // - // HBaseTableSplit split = new HBaseTableSplit( - // table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, - // HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort() - // .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], - // SourceMode.EMPTY, false); - // - // splits.add(split); - // } - - LOG.info("RETURNED SPLITS: split -> " + splits); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - default: - throw new IOException("Unknown source Mode : " + sourceMode); - } - } - - private String reverseDNS(InetAddress ipAddress) throws NamingException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( - ipAddress, this.nameServer)); - this.reverseDNSCacheMap.put(ipAddress, hostName); - } - return hostName; - } - - @Override - public RecordReader<ImmutableBytesWritable, Result> getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - if (!(split instanceof HBaseMultiInputSplit)) - throw new IOException("Table Split is not type HBaseMultiInputSplit"); - - HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split; - - HBaseRecordReader trr = new HBaseRecordReader(tSplit); - - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - trr.setUseSalt(useSalt); - - trr.setNextSplit(); - - return trr; - } - - /* Configuration Section */ - - /** - * space delimited list of columns - */ - public static final String COLUMN_LIST = "hbase.tablecolumns"; - - /** - * Use this jobconf param to specify the input table - */ - private static final String INPUT_TABLE = "hbase.inputtable"; - - private String startKey = null; - private String stopKey = null; - - private SourceMode sourceMode = SourceMode.EMPTY; - private TreeSet<String> keyList = null; - private int versions = 1; - private boolean useSalt = false; - private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - public void configure(JobConf job) { - String tableName = getTableName(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte[][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - - try { - setHTable(new HTable(HBaseConfiguration.create(job), tableName)); - } catch (Exception e) { - LOG.error("************* Table could not be created"); - LOG.error(StringUtils.stringifyException(e)); - } - - LOG.debug("Entered : " + this.getClass() + " : configure()"); - - useSalt = job.getBoolean( - String.format(HBaseConstants.USE_SALT, getTableName(job)), false); - prefixList = job.get( - String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), - HBaseSalter.DEFAULT_PREFIX_LIST); - - sourceMode = SourceMode.valueOf(job.get(String.format( - HBaseConstants.SOURCE_MODE, getTableName(job)))); - - LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String - .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job - .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), - sourceMode)); - - switch (sourceMode) { - case SCAN_RANGE: - LOG.info("HIT SCAN_RANGE"); - - startKey = getJobProp(job, - String.format(HBaseConstants.START_KEY, getTableName(job))); - stopKey = getJobProp(job, - String.format(HBaseConstants.STOP_KEY, getTableName(job))); - - LOG.info(String.format("Setting start key (%s) and stop key (%s)", - startKey, stopKey)); - break; - - case GET_LIST: - LOG.info("HIT GET_LIST"); - - Collection<String> keys = job.getStringCollection(String.format( - HBaseConstants.KEY_LIST, getTableName(job))); - keyList = new TreeSet<String>(keys); - - versions = job.getInt( - String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - - LOG.debug("GOT KEY LIST : " + keys); - LOG.debug(String.format("SETTING key list (%s)", keyList)); - - break; - - case EMPTY: - LOG.info("HIT EMPTY"); - - sourceMode = SourceMode.SCAN_ALL; - break; - - default: - LOG.info("HIT DEFAULT"); - - break; - } - } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - String tableName = getTableName(job); - - if (tableName == null) { - throw new IOException("expecting one table name"); - } - LOG.debug(String.format("Found Table name [%s]", tableName)); - - // connected to table? - if (getHTable() == null) { - throw new IOException("could not connect to table '" + tableName + "'"); - } - LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - LOG.debug(String.format("Found Columns [%s]", colArg)); - - LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, - stopKey)); - - if (sourceMode == SourceMode.EMPTY) { - throw new IOException("SourceMode should not be EMPTY"); - } - - if (sourceMode == SourceMode.GET_LIST - && (keyList == null || keyList.size() == 0)) { - throw new IOException("Source mode is GET_LIST bu key list is empty"); - } - } - - /* Getters & Setters */ - private HTable getHTable() { - return this.table; - } - - private void setHTable(HTable ht) { - this.table = ht; - } - - private void setInputColumns(byte[][] ic) { - this.inputColumns = ic; - } - - private void setJobProp(JobConf job, String key, String value) { - if (job.get(key) != null) - throw new RuntimeException(String.format( - "Job Conf already has key [%s] with value [%s]", key, - job.get(key))); - job.set(key, value); - } - - private String getJobProp(JobConf job, String key) { - return job.get(key); - } - - public static void setTableName(JobConf job, String tableName) { - // Make sure that table has not been set before - String oldTableName = getTableName(job); - if (oldTableName != null) { - throw new RuntimeException("table name already set to: '" - + oldTableName + "'"); - } - - job.set(INPUT_TABLE, tableName); - } - - public static String getTableName(JobConf job) { - return job.get(INPUT_TABLE); - } - - protected boolean includeRegionInSplit(final byte[] startKey, - final byte[] endKey) { - return true; - } -} |