diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-08-07 11:37:13 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-08-07 11:37:13 +0100 |
commit | 9a9f51c9eea317f5d28e6bc2f930b45159e7bb57 (patch) | |
tree | d829ee927f230b26c45afd1d0e67253b25c2f114 /src/main/java/parallelai/spyglass | |
parent | de38789a032b586b0e278a81dedb5c8fb6d43e02 (diff) | |
download | SpyGlass-9a9f51c9eea317f5d28e6bc2f930b45159e7bb57.tar.gz SpyGlass-9a9f51c9eea317f5d28e6bc2f930b45159e7bb57.zip |
Added testing code for multi input splits
Diffstat (limited to 'src/main/java/parallelai/spyglass')
3 files changed, 1128 insertions, 1 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index 8e121bc..5a1184f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -90,7 +90,7 @@ public class HBaseInputFormat implements @SuppressWarnings("deprecation") @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException { if (this.table == null) { throw new IOException("No table was provided"); } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java new file mode 100644 index 0000000..96bfea1 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java @@ -0,0 +1,622 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseInputFormat_SINGLE implements + InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + + private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class); + + private final String id = UUID.randomUUID().toString(); + + private byte[][] inputColumns; + private HTable table; + // private HBaseRecordReader tableRecordReader; + private Filter rowFilter; + // private String tableName = ""; + + private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); + + private String nameServer = null; + + // private Scan scan = null; + + @SuppressWarnings("deprecation") + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + if (this.table == null) { + throw new IOException("No table was provided"); + } + + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + + Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + + if (keys == null || keys.getFirst() == null + || keys.getFirst().length == 0) { + HRegionLocation regLoc = table.getRegionLocation( + HConstants.EMPTY_BYTE_ARRAY, false); + + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); + HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split( + Addressing.HOSTNAME_PORT_SEPARATOR)[0], + SourceMode.EMPTY, false); + + splits.add(split); + + return splits.toArray(new HBaseTableSplit[splits.size()]); + } + + if (keys.getSecond() == null || keys.getSecond().length == 0) { + throw new IOException("Expecting at least one region."); + } + + if (keys.getFirst().length != keys.getSecond().length) { + throw new IOException("Regions for start and end key do not match"); + } + + byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; + byte[] maxKey = keys.getSecond()[0]; + + LOG.debug(String.format("SETTING min key (%s) and max key (%s)", + Bytes.toString(minKey), Bytes.toString(maxKey))); + + byte[][] regStartKeys = keys.getFirst(); + byte[][] regStopKeys = keys.getSecond(); + String[] regions = new String[regStartKeys.length]; + + for (int i = 0; i < regStartKeys.length; i++) { + minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) + && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] + : minKey; + maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) + && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] + : maxKey; + + HServerAddress regionServerAddress = table.getRegionLocation( + keys.getFirst()[i]).getServerAddress(); + InetAddress regionAddress = regionServerAddress.getInetSocketAddress() + .getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + // HServerAddress regionServerAddress = + // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); + // InetAddress regionAddress = + // regionServerAddress.getInetSocketAddress().getAddress(); + // + // String regionLocation; + // + // try { + // regionLocation = reverseDNS(regionAddress); + // } catch (NamingException e) { + // LOG.error("Cannot resolve the host name for " + regionAddress + + // " because of " + e); + // regionLocation = regionServerAddress.getHostname(); + // } + + // String regionLocation = + // table.getRegionLocation(keys.getFirst()[i]).getHostname(); + + LOG.debug("***** " + regionLocation); + + if (regionLocation == null || regionLocation.length() == 0) + throw new IOException("The region info for regiosn " + i + + " is null or empty"); + + regions[i] = regionLocation; + + LOG.debug(String.format( + "Region (%s) has start key (%s) and stop key (%s)", regions[i], + Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); + } + + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + + LOG.debug(String.format("Found min key (%s) and max key (%s)", + Bytes.toString(minKey), Bytes.toString(maxKey))); + + LOG.debug("SOURCE MODE is : " + sourceMode); + + switch (sourceMode) { + case SCAN_ALL: + startRow = HConstants.EMPTY_START_ROW; + stopRow = HConstants.EMPTY_END_ROW; + + LOG.info(String.format( + "SCAN ALL: Found start key (%s) and stop key (%s)", + Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + + case SCAN_RANGE: + startRow = (startKey != null && startKey.length() != 0) ? Bytes + .toBytes(startKey) : HConstants.EMPTY_START_ROW; + stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes + .toBytes(stopKey) : HConstants.EMPTY_END_ROW; + + LOG.info(String.format( + "SCAN RANGE: Found start key (%s) and stop key (%s)", + Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + } + + switch (sourceMode) { + case EMPTY: + case SCAN_ALL: + case SCAN_RANGE: { + // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : + // startRow; + // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : + // stopRow; + + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + if (!useSalt) { + + List<HRegionLocation> validRegions = table.getRegionsInRange( + startRow, stopRow); + + int maxRegions = validRegions.size(); + int currentRegion = 1; + + for (HRegionLocation cRegion : validRegions) { + byte[] rStart = cRegion.getRegionInfo().getStartKey(); + byte[] rStop = cRegion.getRegionInfo().getEndKey(); + + HServerAddress regionServerAddress = cRegion + .getServerAddress(); + InetAddress regionAddress = regionServerAddress + .getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + + regionAddress + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + byte[] sStart = (startRow == HConstants.EMPTY_START_ROW + || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart + : startRow); + byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW + || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop + : stopRow); + + LOG.debug(String.format( + "BOOL start (%s) stop (%s) length (%d)", + (startRow == HConstants.EMPTY_START_ROW || (Bytes + .compareTo(startRow, rStart) <= 0)), + (stopRow == HConstants.EMPTY_END_ROW || (Bytes + .compareTo(stopRow, rStop) >= 0)), rStop.length)); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), sStart, sStop, regionLocation, + SourceMode.SCAN_RANGE, useSalt); + + split.setEndRowInclusive(currentRegion == maxRegions); + + currentRegion++; + + LOG.debug(String + .format( + "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", + Bytes.toString(startRow), + Bytes.toString(stopRow), Bytes.toString(rStart), + Bytes.toString(rStop), Bytes.toString(sStart), + Bytes.toString(sStop), cRegion.getHostnamePort(), + split)); + + splits.add(split); + } + } else { + LOG.debug("Using SALT : " + useSalt); + + // Will return the start and the stop key with all possible + // prefixes. + for (int i = 0; i < regions.length; i++) { + Pair<byte[], byte[]>[] intervals = HBaseSalter + .getDistributedIntervals(startRow, stopRow, + regStartKeys[i], regStopKeys[i], prefixList); + + for (Pair<byte[], byte[]> pair : intervals) { + LOG.info("".format( + "Using SALT, Region (%s) Start (%s) Stop (%s)", + regions[i], Bytes.toString(pair.getFirst()), + Bytes.toString(pair.getSecond()))); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), pair.getFirst(), + pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, + useSalt); + + split.setEndRowInclusive(true); + splits.add(split); + } + } + } + + LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); + for (HBaseTableSplit s : splits) { + LOG.info("RETURNED SPLITS: split -> " + s); + } + + return splits.toArray(new HBaseTableSplit[splits.size()]); + } + + case GET_LIST: { + // if( keyList == null || keyList.size() == 0 ) { + if (keyList == null) { + throw new IOException( + "Source Mode is GET_LIST but key list is EMPTY"); + } + + if (useSalt) { + TreeSet<String> tempKeyList = new TreeSet<String>(); + + for (String key : keyList) { + tempKeyList.add(HBaseSalter.addSaltPrefix(key)); + } + + keyList = tempKeyList; + } + + LOG.debug("".format("Splitting Key List (%s)", keyList)); + + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + for (int i = 0; i < keys.getFirst().length; i++) { + + if (!includeRegionInSplit(keys.getFirst()[i], + keys.getSecond()[i])) { + continue; + } + + LOG.debug(String.format( + "Getting region (%s) subset (%s) to (%s)", regions[i], + Bytes.toString(regStartKeys[i]), + Bytes.toString(regStartKeys[i]))); + + Set<String> regionsSubSet = null; + + if ((regStartKeys[i] == null || regStartKeys[i].length == 0) + && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { + LOG.debug("REGION start is empty"); + LOG.debug("REGION stop is empty"); + regionsSubSet = keyList; + } else if (regStartKeys[i] == null + || regStartKeys[i].length == 0) { + LOG.debug("REGION start is empty"); + regionsSubSet = keyList.headSet( + Bytes.toString(regStopKeys[i]), true); + } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { + LOG.debug("REGION stop is empty"); + regionsSubSet = keyList.tailSet( + Bytes.toString(regStartKeys[i]), true); + } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { + regionsSubSet = keyList.subSet( + Bytes.toString(regStartKeys[i]), true, + Bytes.toString(regStopKeys[i]), true); + } else { + throw new IOException(String.format( + "For REGION (%s) Start Key (%s) > Stop Key(%s)", + regions[i], Bytes.toString(regStartKeys[i]), + Bytes.toString(regStopKeys[i]))); + } + + if (regionsSubSet == null || regionsSubSet.size() == 0) { + LOG.debug("EMPTY: Key is for region " + regions[i] + + " is null"); + + continue; + } + + TreeSet<String> regionKeyList = new TreeSet<String>( + regionsSubSet); + + LOG.debug(String.format("Regions [%s] has key list <%s>", + regions[i], regionKeyList)); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), regionKeyList, versions, regions[i], + SourceMode.GET_LIST, useSalt); + splits.add(split); + } + + LOG.debug("RETURNED SPLITS: split -> " + splits); + + return splits.toArray(new HBaseTableSplit[splits.size()]); + } + + default: + throw new IOException("Unknown source Mode : " + sourceMode); + } + } + + private String reverseDNS(InetAddress ipAddress) throws NamingException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( + ipAddress, this.nameServer)); + this.reverseDNSCacheMap.put(ipAddress, hostName); + } + return hostName; + } + + @Override + public RecordReader<ImmutableBytesWritable, Result> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + + if (!(split instanceof HBaseTableSplit)) + throw new IOException("Table Split is not type HBaseTableSplit"); + + HBaseTableSplit tSplit = (HBaseTableSplit) split; + + HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE(); + + switch (tSplit.getSourceMode()) { + case SCAN_ALL: + case SCAN_RANGE: { + LOG.debug(String.format( + "For split [%s] we have start key (%s) and stop key (%s)", + tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setEndRowInclusive(tSplit.getEndRowInclusive()); + trr.setUseSalt(useSalt); + } + + break; + + case GET_LIST: { + LOG.debug(String.format("For split [%s] we have key list (%s)", + tSplit, tSplit.getKeyList())); + + trr.setKeyList(tSplit.getKeyList()); + trr.setVersions(tSplit.getVersions()); + trr.setUseSalt(useSalt); + } + + break; + + default: + throw new IOException("Unknown source mode : " + + tSplit.getSourceMode()); + } + + trr.setSourceMode(tSplit.getSourceMode()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + + trr.init(); + + return trr; + } + + /* Configuration Section */ + + /** + * space delimited list of columns + */ + public static final String COLUMN_LIST = "hbase.tablecolumns"; + + /** + * Use this jobconf param to specify the input table + */ + private static final String INPUT_TABLE = "hbase.inputtable"; + + private String startKey = null; + private String stopKey = null; + + private SourceMode sourceMode = SourceMode.EMPTY; + private TreeSet<String> keyList = null; + private int versions = 1; + private boolean useSalt = false; + private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + + public void configure(JobConf job) { + String tableName = getTableName(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte[][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColumns(m_cols); + + try { + setHTable(new HTable(HBaseConfiguration.create(job), tableName)); + } catch (Exception e) { + LOG.error("************* Table could not be created"); + LOG.error(StringUtils.stringifyException(e)); + } + + LOG.debug("Entered : " + this.getClass() + " : configure()"); + + useSalt = job.getBoolean( + String.format(HBaseConstants.USE_SALT, getTableName(job)), false); + prefixList = job.get( + String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), + HBaseSalter.DEFAULT_PREFIX_LIST); + + sourceMode = SourceMode.valueOf(job.get(String.format( + HBaseConstants.SOURCE_MODE, getTableName(job)))); + + LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String + .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job + .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), + sourceMode)); + + switch (sourceMode) { + case SCAN_RANGE: + LOG.info("HIT SCAN_RANGE"); + + startKey = getJobProp(job, + String.format(HBaseConstants.START_KEY, getTableName(job))); + stopKey = getJobProp(job, + String.format(HBaseConstants.STOP_KEY, getTableName(job))); + + LOG.info(String.format("Setting start key (%s) and stop key (%s)", + startKey, stopKey)); + break; + + case GET_LIST: + LOG.info("HIT GET_LIST"); + + Collection<String> keys = job.getStringCollection(String.format( + HBaseConstants.KEY_LIST, getTableName(job))); + keyList = new TreeSet<String>(keys); + + versions = job.getInt( + String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + + LOG.debug("GOT KEY LIST : " + keys); + LOG.debug(String.format("SETTING key list (%s)", keyList)); + + break; + + case EMPTY: + LOG.info("HIT EMPTY"); + + sourceMode = SourceMode.SCAN_ALL; + break; + + default: + LOG.info("HIT DEFAULT"); + + break; + } + } + + public void validateInput(JobConf job) throws IOException { + // expecting exactly one path + String tableName = getTableName(job); + + if (tableName == null) { + throw new IOException("expecting one table name"); + } + LOG.debug(String.format("Found Table name [%s]", tableName)); + + // connected to table? + if (getHTable() == null) { + throw new IOException("could not connect to table '" + tableName + "'"); + } + LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + LOG.debug(String.format("Found Columns [%s]", colArg)); + + LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, + stopKey)); + + if (sourceMode == SourceMode.EMPTY) { + throw new IOException("SourceMode should not be EMPTY"); + } + + if (sourceMode == SourceMode.GET_LIST + && (keyList == null || keyList.size() == 0)) { + throw new IOException("Source mode is GET_LIST bu key list is empty"); + } + } + + /* Getters & Setters */ + private HTable getHTable() { + return this.table; + } + + private void setHTable(HTable ht) { + this.table = ht; + } + + private void setInputColumns(byte[][] ic) { + this.inputColumns = ic; + } + + private void setJobProp(JobConf job, String key, String value) { + if (job.get(key) != null) + throw new RuntimeException(String.format( + "Job Conf already has key [%s] with value [%s]", key, + job.get(key))); + job.set(key, value); + } + + private String getJobProp(JobConf job, String key) { + return job.get(key); + } + + public static void setTableName(JobConf job, String tableName) { + // Make sure that table has not been set before + String oldTableName = getTableName(job); + if (oldTableName != null) { + throw new RuntimeException("table name already set to: '" + + oldTableName + "'"); + } + + job.set(INPUT_TABLE, tableName); + } + + public static String getTableName(JobConf job) { + return job.get(INPUT_TABLE); + } + + protected boolean includeRegionInSplit(final byte[] startKey, + final byte[] endKey) { + return true; + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java new file mode 100644 index 0000000..5eafc78 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java @@ -0,0 +1,505 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReader_SINGLE implements + RecordReader<ImmutableBytesWritable, Result> { + + static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class); + + private byte[] startRow; + private byte[] endRow; + private byte[] lastSuccessfulRow; + private TreeSet<String> keyList; + private SourceMode sourceMode; + private Filter trrRowFilter; + private ResultScanner scanner; + private HTable htable; + private byte[][] trrInputColumns; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; + private boolean endRowInclusive = true; + private int versions = 1; + private boolean useSalt = false; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restartRangeScan(byte[] firstRow) throws IOException { + Scan currentScan; + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + scan.setCacheBlocks(false); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } else { + LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) + + ", endRow: " + Bytes.toString(endRow)); + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + TableInputFormat.addColumns(scan, trrInputColumns); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + } else { + LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + + ", no endRow"); + + Scan scan = new Scan(firstRow); + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + if (logScannerActivity) { + LOG.debug("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } + } + + public TreeSet<String> getKeyList() { + return keyList; + } + + public void setKeyList(TreeSet<String> keyList) { + this.keyList = keyList; + } + + public void setVersions(int versions) { + this.versions = versions; + } + + public void setUseSalt(boolean useSalt) { + this.useSalt = useSalt; + } + + public SourceMode getSourceMode() { + return sourceMode; + } + + public void setSourceMode(SourceMode sourceMode) { + this.sourceMode = sourceMode; + } + + public byte[] getEndRow() { + return endRow; + } + + public void setEndRowInclusive(boolean isInclusive) { + endRowInclusive = isInclusive; + } + + public boolean getEndRowInclusive() { + return endRowInclusive; + } + + private byte[] nextKey = null; + private Vector<List<KeyValue>> resultVector = null; + Map<Long, List<KeyValue>> keyValueMap = null; + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + restartRangeScan(startRow); + break; + + case GET_LIST: + nextKey = Bytes.toBytes(keyList.pollFirst()); + break; + + default: + throw new IOException(" Unknown source mode : " + sourceMode); + } + } + + byte[] getStartRow() { + return this.startRow; + } + + /** + * @param htable + * the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, + false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * @param inputColumns + * the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte[][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow + * the first row in the split + */ + public void setStartRow(final byte[] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow + * the last row in the split + */ + public void setEndRow(final byte[] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter + * the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + @Override + public void close() { + if (this.scanner != null) + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + @Override + public Result createValue() { + return new Result(); + } + + @Override + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + @Override + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key + * HStoreKey as input key. + * @param value + * MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + @Override + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: { + + Result result; + try { + try { + result = this.scanner.next(); + if (logScannerActivity) { + rowcount++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restartRangeScan(startRow); + } else { + restartRangeScan(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + LOG.debug(ioe); + String lastRow = lastSuccessfulRow == null ? "null" : Bytes + .toStringBinary(lastSuccessfulRow); + LOG.debug("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } + } + + case GET_LIST: { + LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); + + if (versions == 1) { + if (nextKey != null) { + LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result result = this.htable.get(theGet); + + if (result != null && (! result.isEmpty()) ) { + LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + + if (keyList != null || !keyList.isEmpty()) { + String newKey = keyList.pollFirst(); + LOG.debug("New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + } else { + nextKey = null; + } + + LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); + + // Write the result + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 + + String newKey; + while((newKey = keyList.pollFirst()) != null) { + LOG.debug("WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + } else { + // Nothig left. return false + return false; + } + } else { + if (resultVector != null && resultVector.size() != 0) { + LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + + List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + if (nextKey != null) { + LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result resultAll = this.htable.get(theGet); + + if( resultAll != null && (! resultAll.isEmpty())) { + List<KeyValue> keyValeList = resultAll.list(); + + keyValueMap = new HashMap<Long, List<KeyValue>>(); + + LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + + for (KeyValue keyValue : keyValeList) { + long version = keyValue.getTimestamp(); + + if (keyValueMap.containsKey(new Long(version))) { + List<KeyValue> keyValueTempList = keyValueMap.get(new Long( + version)); + if (keyValueTempList == null) { + keyValueTempList = new ArrayList<KeyValue>(); + } + keyValueTempList.add(keyValue); + } else { + List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); + keyValueMap.put(new Long(version), keyValueTempList); + keyValueTempList.add(keyValue); + } + } + + resultVector = new Vector<List<KeyValue>>(); + resultVector.addAll(keyValueMap.values()); + + List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + + String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + + System.out.println("+ New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 + + String newKey; + + while( (newKey = keyList.pollFirst()) != null ) { + LOG.debug("+ WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("+ BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + + } else { + return false; + } + } + } + } + default: + throw new IOException("Unknown source mode : " + sourceMode); + } + } +} |