aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java38
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java30
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java55
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java10
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java9
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java12
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java11
8 files changed, 110 insertions, 58 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 929e9d8..64effc9 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -75,7 +75,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(
- Addressing.HOSTNAME_PORT_SEPARATOR)[0],
+ Addressing.HOSTNAME_PORT_SEPARATOR)[0], regLoc.getRegionInfo().getRegionNameAsString(),
SourceMode.EMPTY, false);
splits.add(split);
@@ -100,6 +100,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
byte[][] regStartKeys = keys.getFirst();
byte[][] regStopKeys = keys.getSecond();
String[] regions = new String[regStartKeys.length];
+ String[] regionNames = new String[regStartKeys.length];
for (int i = 0; i < regStartKeys.length; i++) {
minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0)
@@ -109,10 +110,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
&& (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i]
: maxKey;
- HServerAddress regionServerAddress = table.getRegionLocation(
- keys.getFirst()[i]).getServerAddress();
- InetAddress regionAddress = regionServerAddress.getInetSocketAddress()
- .getAddress();
+ HRegionLocation regionLoc = table.getRegionLocation(keys.getFirst()[i]);
+ HServerAddress regionServerAddress = regionLoc.getServerAddress();
+ InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
@@ -122,23 +122,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
regionLocation = regionServerAddress.getHostname();
}
- // HServerAddress regionServerAddress =
- // table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
- // InetAddress regionAddress =
- // regionServerAddress.getInetSocketAddress().getAddress();
- //
- // String regionLocation;
- //
- // try {
- // regionLocation = reverseDNS(regionAddress);
- // } catch (NamingException e) {
- // LOG.error("Cannot resolve the host name for " + regionAddress +
- // " because of " + e);
- // regionLocation = regionServerAddress.getHostname();
- // }
-
- // String regionLocation =
- // table.getRegionLocation(keys.getFirst()[i]).getHostname();
+ regionNames[i] = regionLoc.getRegionInfo().getRegionNameAsString();
LOG.debug("***** " + regionLocation);
@@ -219,7 +203,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
regionLocation = regionServerAddress.getHostname();
}
- byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
+ String regionName = cRegion.getRegionInfo().getRegionNameAsString();
+
+ byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
|| (Bytes.compareTo(startRow, rStart) <= 0) ? rStart
: startRow);
byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW
@@ -234,7 +220,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
.compareTo(stopRow, rStop) >= 0)), rStop.length));
HBaseTableSplitGranular split = new HBaseTableSplitGranular(
- table.getTableName(), sStart, sStop, regionLocation,
+ table.getTableName(), sStart, sStop, regionLocation, regionName,
SourceMode.SCAN_RANGE, useSalt);
split.setEndRowInclusive(currentRegion == maxRegions);
@@ -270,7 +256,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
HBaseTableSplitGranular split = new HBaseTableSplitGranular(
table.getTableName(), pair.getFirst(),
- pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
+ pair.getSecond(), regions[i], regionNames[i], SourceMode.SCAN_RANGE,
useSalt);
split.setEndRowInclusive(true);
@@ -361,7 +347,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
regions[i], regionKeyList));
HBaseTableSplitGranular split = new HBaseTableSplitGranular(
- table.getTableName(), regionKeyList, versions, regions[i],
+ table.getTableName(), regionKeyList, versions, regions[i], regionNames[i],
SourceMode.GET_LIST, useSalt);
splits.add(split);
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
index eadb57e..8185b22 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
@@ -9,7 +9,6 @@ import org.apache.hadoop.mapred.*;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
/**
* Created with IntelliJ IDEA.
@@ -28,12 +27,17 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
granular.configure(job);
HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits);
- HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits );
+ HBaseTableSplitRegional[] splits = convertToRegionalSplitArray(gSplits);
if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL");
- LOG.info("GRANULAR => " + gSplits);
- LOG.info("REGIONAL => " + splits);
+ for(HBaseTableSplitGranular g : gSplits) {
+ LOG.info("GRANULAR => " + g);
+ }
+
+ for(HBaseTableSplitRegional r : splits ) {
+ LOG.info("REGIONAL => " + r);
+ }
return splits;
}
@@ -43,10 +47,10 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
if (!(inputSplit instanceof HBaseTableSplitRegional))
throw new IOException("Table Split is not type HBaseTableSplitRegional");
- LOG.info("REGIONAL SPLIT -> " + inputSplit);
-
HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit;
+ LOG.info("REGIONAL SPLIT -> " + tSplit);
+
HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional();
HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
@@ -60,7 +64,7 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
return trr;
}
- private HBaseTableSplitRegional[] convertToMultiSplitArray(
+ private HBaseTableSplitRegional[] convertToRegionalSplitArray(
HBaseTableSplitGranular[] splits) throws IOException {
if (splits == null)
@@ -70,16 +74,16 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
for (HBaseTableSplitGranular hbt : splits) {
HBaseTableSplitRegional mis = null;
- if (regionSplits.containsKey(hbt.getRegionLocation())) {
- mis = regionSplits.get(hbt.getRegionLocation());
+ if (regionSplits.containsKey(hbt.getRegionName())) {
+ mis = regionSplits.get(hbt.getRegionName());
} else {
- regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional(
- hbt.getRegionLocation()));
- mis = regionSplits.get(hbt.getRegionLocation());
+ regionSplits.put(hbt.getRegionName(), new HBaseTableSplitRegional(
+ hbt.getRegionLocation(), hbt.getRegionName()));
+ mis = regionSplits.get(hbt.getRegionName());
}
mis.addSplit(hbt);
- regionSplits.put(hbt.getRegionLocation(), mis);
+ regionSplits.put(hbt.getRegionName(), mis);
}
// for(String region : regionSplits.keySet() ) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
index 37858ad..e0d0cbe 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
@@ -24,6 +24,7 @@ public abstract class HBaseRecordReaderBase implements
RecordReader<ImmutableBytesWritable, Result> {
protected TreeSet<String> keyList;
+ protected long initialNoOfKeys = 0;
protected HBaseConstants.SourceMode sourceMode;
protected boolean endRowInclusive = true;
protected int versions = 1;
@@ -39,6 +40,7 @@ public abstract class HBaseRecordReaderBase implements
protected boolean logScannerActivity = false;
protected int logPerRowCount = 100;
+ protected int noOfLogCount = 0;
@Override
public String toString() {
@@ -107,6 +109,7 @@ public abstract class HBaseRecordReaderBase implements
public void setKeyList(TreeSet<String> keyList) {
this.keyList = keyList;
+ initialNoOfKeys = (this.keyList == null) ? 0 : this.keyList.size();
}
public void setVersions(int versions) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index 6c28d9f..2155d99 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
+import org.jruby.javasupport.util.RuntimeHelpers;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@@ -37,7 +38,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
private byte[] lastSuccessfulRow;
private ResultScanner scanner;
private long timestamp;
- private int rowcount;
+ private int rowcount = 0;
@Override
public String toString() {
@@ -51,6 +52,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
return sbuf.toString();
}
+ private final int scanCaching = 1000;
+
/**
* Restart from survivable exceptions by creating a new scanner.
@@ -67,7 +70,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
- scan.setCacheBlocks(false);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
} else {
@@ -76,6 +80,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
new byte[] { 0 }) : endRow));
TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -86,6 +92,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
Scan scan = new Scan(firstRow);
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -150,15 +158,37 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@Override
public long getPos() {
- // This should be the ordinal tuple in the range;
- // not clear how to calculate...
- return 0;
+ switch(sourceMode) {
+ case GET_LIST:
+ long posGet = (keyList != null ) ? 0 : initialNoOfKeys - keyList.size() ;
+ return posGet;
+
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ long posScan = (noOfLogCount * logPerRowCount) + rowcount;
+ return posScan;
+
+ default:
+ return 0;
+ }
}
@Override
public float getProgress() {
// Depends on the total number of tuples and getPos
- return 0;
+ switch(sourceMode) {
+ case GET_LIST:
+ float progGet = ((initialNoOfKeys == 0) ? 0 : (getPos() / initialNoOfKeys));
+ return progGet;
+
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ float progScan = (getPos() / (getPos() + 10000));
+ return progScan;
+
+ default:
+ return 0;
+ }
}
/**
@@ -181,15 +211,18 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
try {
try {
result = this.scanner.next();
- if (logScannerActivity) {
rowcount++;
if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ timestamp = now;
+ noOfLogCount ++;
+ rowcount = 0;
+ }
+
+ if (logScannerActivity) {
long now = System.currentTimeMillis();
LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
- + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
+ + rowcount + " rows");
}
} catch (IOException e) {
// try to handle all IOExceptions by restarting
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
index e2b1ec8..5d2b613 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
@@ -90,6 +90,7 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
boolean nextFlag = currentRecordReader.next(ibw, result);
while(nextFlag == false && multiSplit.hasMoreSplits() ) {
+ totalPos += currentRecordReader.getPos();
setNextSplit();
nextFlag = currentRecordReader.next(ibw, result);
}
@@ -97,6 +98,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
return nextFlag;
}
+ long totalPos = 0;
+
@Override
public ImmutableBytesWritable createKey() {
return currentRecordReader.createKey();
@@ -109,7 +112,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
@Override
public long getPos() throws IOException {
- return currentRecordReader.getPos();
+ long pos = totalPos + currentRecordReader.getPos();
+ return pos;
}
@Override
@@ -119,6 +123,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
@Override
public float getProgress() throws IOException {
- return currentRecordReader.getProgress();
+ // ( current count + percent of next count ) / max count
+ float prog = ((multiSplit.getCurrSplitCount() + currentRecordReader.getProgress()) / multiSplit.getLength());
+ return prog;
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
index 2f6e7b5..e24771f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
@@ -29,6 +29,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
protected byte[] m_startRow = null;
protected byte[] m_endRow = null;
protected String m_regionLocation = null;
+ protected String m_regionName = null;
protected TreeSet<String> m_keyList = null;
protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY;
protected boolean m_endRowInclusive = true;
@@ -90,6 +91,10 @@ public abstract class HBaseTableSplitBase implements InputSplit,
return new String[] { this.m_regionLocation };
}
+ public String getRegionName() {
+ return this.m_regionName;
+ }
+
public void copy(HBaseTableSplitBase that) {
this.m_endRow = that.m_endRow;
@@ -100,6 +105,8 @@ public abstract class HBaseTableSplitBase implements InputSplit,
this.m_tableName = that.m_tableName;
this.m_useSalt = that.m_useSalt;
this.m_versions = that.m_versions;
+ this.m_regionLocation = that.m_regionLocation;
+ this.m_regionName = that.m_regionName;
}
@Override
@@ -108,6 +115,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
this.m_tableName = Bytes.readByteArray(in);
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ this.m_regionName = Bytes.toString(Bytes.readByteArray(in));
this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes
.readByteArray(in)));
this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
@@ -140,6 +148,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
Bytes.writeByteArray(out, this.m_tableName);
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionName));
Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
index 4de7153..a266411 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
@@ -21,7 +21,7 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase {
/** default constructor */
public HBaseTableSplitGranular() {
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false);
+ HConstants.EMPTY_BYTE_ARRAY, "", "", HBaseConstants.SourceMode.EMPTY, false);
}
/**
@@ -33,24 +33,26 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase {
* @param location
*/
public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow,
- final byte[] endRow, final String location,
+ final byte[] endRow, final String location, final String regionName,
final HBaseConstants.SourceMode sourceMode, final boolean useSalt) {
this.m_tableName = tableName;
this.m_startRow = startRow;
this.m_endRow = endRow;
this.m_regionLocation = location;
+ this.m_regionName = regionName;
this.m_sourceMode = sourceMode;
this.m_useSalt = useSalt;
}
public HBaseTableSplitGranular(final byte[] tableName,
- final TreeSet<String> keyList, int versions, final String location,
+ final TreeSet<String> keyList, int versions, final String location, final String regionName,
final HBaseConstants.SourceMode sourceMode, final boolean useSalt) {
this.m_tableName = tableName;
this.m_keyList = keyList;
this.m_versions = versions;
this.m_sourceMode = sourceMode;
this.m_regionLocation = location;
+ this.m_regionName = regionName;
this.m_useSalt = useSalt;
}
@@ -67,8 +69,8 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase {
public String toString() {
return String
.format(
- "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
- Bytes.toString(m_tableName), m_regionLocation, m_sourceMode,
+ "Table Name (%s) Region Location (%s) Name (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_regionName, m_sourceMode,
Bytes.toString(m_startRow), Bytes.toString(m_endRow),
(m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions,
m_useSalt);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
index 1ebfa3d..ad5f78b 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
@@ -26,8 +26,9 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase {
}
- public HBaseTableSplitRegional(String regionLocation) {
+ public HBaseTableSplitRegional(String regionLocation, String regionName) {
this.m_regionLocation = regionLocation;
+ this.m_regionName = regionName;
}
@Override
@@ -70,6 +71,8 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase {
str.append(super.toString());
+ str.append(" REGIONAL => Region Location (" + m_regionLocation + ") Name (" + m_regionName + ")" );
+
str.append(" GRANULAR = > ");
for (HBaseTableSplitGranular hbt : splits) {
@@ -114,14 +117,20 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase {
}
private Iterator<HBaseTableSplitGranular> splitIterator = null;
+ private int currSplitCount = 0;
public HBaseTableSplitGranular getNextSplit() {
splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator;
if( splitIterator.hasNext() ) {
+ currSplitCount ++;
return splitIterator.next();
} else {
return null;
}
}
+
+ public int getCurrSplitCount() {
+ return currSplitCount;
+ }
} \ No newline at end of file