diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java | 432 |
1 files changed, 306 insertions, 126 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java index 97077c4..d22ed71 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -3,12 +3,17 @@ package parallelai.spyglass.hbase; import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; import java.io.IOException; -import java.util.Set; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.TreeSet; +import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -18,36 +23,38 @@ import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { +public class HBaseRecordReader implements + RecordReader<ImmutableBytesWritable, Result> { static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - private byte [] startRow; - private byte [] endRow; - private byte [] lastSuccessfulRow; + private byte[] startRow; + private byte[] endRow; + private byte[] lastSuccessfulRow; private TreeSet<String> keyList; private SourceMode sourceMode; private Filter trrRowFilter; private ResultScanner scanner; private HTable htable; - private byte [][] trrInputColumns; + private byte[][] trrInputColumns; private long timestamp; private int rowcount; private boolean logScannerActivity = false; private int logPerRowCount = 100; private boolean endRowInclusive = true; + private int versions = 1; + private boolean useSalt = false; /** * Restart from survivable exceptions by creating a new scanner. - * + * * @param firstRow * @throws IOException */ @@ -55,26 +62,26 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R Scan currentScan; if ((endRow != null) && (endRow.length > 0)) { if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, (endRowInclusive ? - Bytes.add(endRow, new byte[] {0}) : endRow ) ); - + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); scan.setCacheBlocks(false); this.scanner = this.htable.getScanner(scan); currentScan = scan; } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toString(firstRow) + ", endRow: " + - Bytes.toString(endRow)); - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); + LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) + + ", endRow: " + Bytes.toString(endRow)); + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); TableInputFormat.addColumns(scan, trrInputColumns); this.scanner = this.htable.getScanner(scan); currentScan = scan; } } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toStringBinary(firstRow) + ", no endRow"); + LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + + ", no endRow"); Scan scan = new Scan(firstRow); TableInputFormat.addColumns(scan, trrInputColumns); @@ -83,7 +90,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R currentScan = scan; } if (logScannerActivity) { - LOG.info("Current scan=" + currentScan.toString()); + LOG.debug("Current scan=" + currentScan.toString()); timestamp = System.currentTimeMillis(); rowcount = 0; } @@ -97,6 +104,14 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R this.keyList = keyList; } + public void setVersions(int versions) { + this.versions = versions; + } + + public void setUseSalt(boolean useSalt) { + this.useSalt = useSalt; + } + public SourceMode getSourceMode() { return sourceMode; } @@ -108,76 +123,84 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R public byte[] getEndRow() { return endRow; } - + public void setEndRowInclusive(boolean isInclusive) { endRowInclusive = isInclusive; } - + public boolean getEndRowInclusive() { return endRowInclusive; } - - private byte [] nextKey = null; + + private byte[] nextKey = null; + private Vector<List<KeyValue>> resultVector = null; + Map<Long, List<KeyValue>> keyValueMap = null; /** * Build the scanner. Not done in constructor to allow for extension. - * + * * @throws IOException */ public void init() throws IOException { - switch( sourceMode ) { - case SCAN_ALL: - case SCAN_RANGE: - restartRangeScan(startRow); - break; - - case GET_LIST: - nextKey = Bytes.toBytes(keyList.pollFirst()); - break; - - default: - throw new IOException(" Unknown source mode : " + sourceMode ); + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + restartRangeScan(startRow); + break; + + case GET_LIST: + nextKey = Bytes.toBytes(keyList.pollFirst()); + break; + + default: + throw new IOException(" Unknown source mode : " + sourceMode); } } byte[] getStartRow() { return this.startRow; } + /** - * @param htable the {@link HTable} to scan. + * @param htable + * the {@link HTable} to scan. */ public void setHTable(HTable htable) { Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean( - ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, + false); logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); this.htable = htable; } /** - * @param inputColumns the columns to be placed in {@link Result}. + * @param inputColumns + * the columns to be placed in {@link Result}. */ - public void setInputColumns(final byte [][] inputColumns) { + public void setInputColumns(final byte[][] inputColumns) { this.trrInputColumns = inputColumns; } /** - * @param startRow the first row in the split + * @param startRow + * the first row in the split */ - public void setStartRow(final byte [] startRow) { + public void setStartRow(final byte[] startRow) { this.startRow = startRow; } /** - * - * @param endRow the last row in the split + * + * @param endRow + * the last row in the split */ - public void setEndRow(final byte [] endRow) { + public void setEndRow(final byte[] endRow) { this.endRow = endRow; } /** - * @param rowFilter the {@link Filter} to be used. + * @param rowFilter + * the {@link Filter} to be used. */ public void setRowFilter(Filter rowFilter) { this.trrRowFilter = rowFilter; @@ -185,12 +208,13 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R @Override public void close() { - if (this.scanner != null) this.scanner.close(); + if (this.scanner != null) + this.scanner.close(); } /** * @return ImmutableBytesWritable - * + * * @see org.apache.hadoop.mapred.RecordReader#createKey() */ @Override @@ -200,7 +224,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R /** * @return RowResult - * + * * @see org.apache.hadoop.mapred.RecordReader#createValue() */ @Override @@ -222,104 +246,260 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R } /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value + * @param key + * HStoreKey as input key. + * @param value + * MapWritable as input value * @return true if there was more data * @throws IOException */ @Override public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - - switch(sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - { - - Result result; + throws IOException { + + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: { + + Result result; + try { try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount ++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // try to handle all IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," + - " if your mapper has restarted a few other times like this" + - " then you should consider killing this job and investigate" + - " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restartRangeScan(startRow); - } else { - restartRangeScan(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + if (logScannerActivity) { + rowcount++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + timestamp = now; + rowcount = 0; } - result = this.scanner.next(); } + } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restartRangeScan(startRow); + } else { + restartRangeScan(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } - if (result != null && result.size() > 0) { + if (result != null && result.size() > 0) { + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { key.set(result.getRow()); - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return false; - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - LOG.info(ioe); - String lastRow = lastSuccessfulRow == null ? - "null" : Bytes.toStringBinary(lastSuccessfulRow); - LOG.info("lastSuccessfulRow=" + lastRow); } - throw ioe; + + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + LOG.debug(ioe); + String lastRow = lastSuccessfulRow == null ? "null" : Bytes + .toStringBinary(lastSuccessfulRow); + LOG.debug("lastSuccessfulRow=" + lastRow); } + throw ioe; } + } + + case GET_LIST: { + LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); - case GET_LIST: - { - Result result; - if( nextKey != null ) { - result = this.htable.get(new Get(nextKey)); + if (versions == 1) { + if (nextKey != null) { + LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); - if (result != null && result.size() > 0) { - System.out.println("KeyList => " + keyList); - System.out.println("Result => " + result); - if (keyList != null || !keyList.isEmpty()) { - - String newKey = keyList.pollFirst(); - System.out.println("New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); - } else { - nextKey = null; - } - key.set(result.getRow()); + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result result = this.htable.get(theGet); + + if (result != null && (! result.isEmpty()) ) { + LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + + if (keyList != null || !keyList.isEmpty()) { + String newKey = keyList.pollFirst(); + LOG.debug("New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + } else { + nextKey = null; + } + + LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); + + // Write the result + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } lastSuccessfulRow = key.get(); Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 + + String newKey; + while((newKey = keyList.pollFirst()) != null) { + LOG.debug("WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); } - return false; } else { + // Nothig left. return false return false; } + } else { + if (resultVector != null && resultVector.size() != 0) { + LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + + List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + if (nextKey != null) { + LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result resultAll = this.htable.get(theGet); + + if( resultAll != null && (! resultAll.isEmpty())) { + List<KeyValue> keyValeList = resultAll.list(); + + keyValueMap = new HashMap<Long, List<KeyValue>>(); + + LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + + for (KeyValue keyValue : keyValeList) { + long version = keyValue.getTimestamp(); + + if (keyValueMap.containsKey(new Long(version))) { + List<KeyValue> keyValueTempList = keyValueMap.get(new Long( + version)); + if (keyValueTempList == null) { + keyValueTempList = new ArrayList<KeyValue>(); + } + keyValueTempList.add(keyValue); + } else { + List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); + keyValueMap.put(new Long(version), keyValueTempList); + keyValueTempList.add(keyValue); + } + } + + resultVector = new Vector<List<KeyValue>>(); + resultVector.addAll(keyValueMap.values()); + + List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + + String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + + System.out.println("+ New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 + + String newKey; + + while( (newKey = keyList.pollFirst()) != null ) { + LOG.debug("+ WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("+ BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + + } else { + return false; + } + } } - - default: - throw new IOException("Unknown source mode : " + sourceMode ); - } + } + default: + throw new IOException("Unknown source mode : " + sourceMode); + } } } |