aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java55
1 files changed, 44 insertions, 11 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index 6c28d9f..2155d99 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
+import org.jruby.javasupport.util.RuntimeHelpers;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@@ -37,7 +38,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
private byte[] lastSuccessfulRow;
private ResultScanner scanner;
private long timestamp;
- private int rowcount;
+ private int rowcount = 0;
@Override
public String toString() {
@@ -51,6 +52,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
return sbuf.toString();
}
+ private final int scanCaching = 1000;
+
/**
* Restart from survivable exceptions by creating a new scanner.
@@ -67,7 +70,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
- scan.setCacheBlocks(false);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
} else {
@@ -76,6 +80,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
new byte[] { 0 }) : endRow));
TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -86,6 +92,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
Scan scan = new Scan(firstRow);
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -150,15 +158,37 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@Override
public long getPos() {
- // This should be the ordinal tuple in the range;
- // not clear how to calculate...
- return 0;
+ switch(sourceMode) {
+ case GET_LIST:
+ long posGet = (keyList != null ) ? 0 : initialNoOfKeys - keyList.size() ;
+ return posGet;
+
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ long posScan = (noOfLogCount * logPerRowCount) + rowcount;
+ return posScan;
+
+ default:
+ return 0;
+ }
}
@Override
public float getProgress() {
// Depends on the total number of tuples and getPos
- return 0;
+ switch(sourceMode) {
+ case GET_LIST:
+ float progGet = ((initialNoOfKeys == 0) ? 0 : (getPos() / initialNoOfKeys));
+ return progGet;
+
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ float progScan = (getPos() / (getPos() + 10000));
+ return progScan;
+
+ default:
+ return 0;
+ }
}
/**
@@ -181,15 +211,18 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
try {
try {
result = this.scanner.next();
- if (logScannerActivity) {
rowcount++;
if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ timestamp = now;
+ noOfLogCount ++;
+ rowcount = 0;
+ }
+
+ if (logScannerActivity) {
long now = System.currentTimeMillis();
LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
- + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
+ + rowcount + " rows");
}
} catch (IOException e) {
// try to handle all IOExceptions by restarting