From f6954fc81a6eaf60ca6088c11b51f86e48733be7 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Mon, 30 Sep 2013 15:24:09 +0100 Subject: 1. Created a mapper per region as ooposed to mapper per region server 2. Added progress indicators 3. Better logging --- .../spyglass/hbase/HBaseRecordReaderGranular.java | 55 +++++++++++++++++----- 1 file changed, 44 insertions(+), 11 deletions(-) (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 6c28d9f..2155d99 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; +import org.jruby.javasupport.util.RuntimeHelpers; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @@ -37,7 +38,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private byte[] lastSuccessfulRow; private ResultScanner scanner; private long timestamp; - private int rowcount; + private int rowcount = 0; @Override public String toString() { @@ -51,6 +52,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { return sbuf.toString(); } + private final int scanCaching = 1000; + /** * Restart from survivable exceptions by creating a new scanner. @@ -67,7 +70,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } else { @@ -76,6 +80,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] { 0 }) : endRow)); TableInputFormat.addColumns(scan, trrInputColumns); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -86,6 +92,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { Scan scan = new Scan(firstRow); TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -150,15 +158,37 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @Override public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; + switch(sourceMode) { + case GET_LIST: + long posGet = (keyList != null ) ? 0 : initialNoOfKeys - keyList.size() ; + return posGet; + + case SCAN_ALL: + case SCAN_RANGE: + long posScan = (noOfLogCount * logPerRowCount) + rowcount; + return posScan; + + default: + return 0; + } } @Override public float getProgress() { // Depends on the total number of tuples and getPos - return 0; + switch(sourceMode) { + case GET_LIST: + float progGet = ((initialNoOfKeys == 0) ? 0 : (getPos() / initialNoOfKeys)); + return progGet; + + case SCAN_ALL: + case SCAN_RANGE: + float progScan = (getPos() / (getPos() + 10000)); + return progScan; + + default: + return 0; + } } /** @@ -181,15 +211,18 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { try { try { result = this.scanner.next(); - if (logScannerActivity) { rowcount++; if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + timestamp = now; + noOfLogCount ++; + rowcount = 0; + } + + if (logScannerActivity) { long now = System.currentTimeMillis(); LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } + + rowcount + " rows"); } } catch (IOException e) { // try to handle all IOExceptions by restarting -- cgit v1.2.3