diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-09-30 15:24:09 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-09-30 15:24:09 +0100 |
commit | f6954fc81a6eaf60ca6088c11b51f86e48733be7 (patch) | |
tree | 79aac956c03c80a0488d93a2fd7aca5241b1c465 /src/main/java/parallelai/spyglass | |
parent | c550ab1c80b384e164979fcbe01f34a8308a8b95 (diff) | |
download | SpyGlass-4.1.1.tar.gz SpyGlass-4.1.1.zip |
1. Created a mapper per region as ooposed to mapper per region server4.1.1
2. Added progress indicators
3. Better logging
Diffstat (limited to 'src/main/java/parallelai/spyglass')
8 files changed, 110 insertions, 58 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 929e9d8..64effc9 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -75,7 +75,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split( - Addressing.HOSTNAME_PORT_SEPARATOR)[0], + Addressing.HOSTNAME_PORT_SEPARATOR)[0], regLoc.getRegionInfo().getRegionNameAsString(), SourceMode.EMPTY, false); splits.add(split); @@ -100,6 +100,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { byte[][] regStartKeys = keys.getFirst(); byte[][] regStopKeys = keys.getSecond(); String[] regions = new String[regStartKeys.length]; + String[] regionNames = new String[regStartKeys.length]; for (int i = 0; i < regStartKeys.length; i++) { minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) @@ -109,10 +110,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] : maxKey; - HServerAddress regionServerAddress = table.getRegionLocation( - keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = regionServerAddress.getInetSocketAddress() - .getAddress(); + HRegionLocation regionLoc = table.getRegionLocation(keys.getFirst()[i]); + HServerAddress regionServerAddress = regionLoc.getServerAddress(); + InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); @@ -122,23 +122,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { regionLocation = regionServerAddress.getHostname(); } - // HServerAddress regionServerAddress = - // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - // InetAddress regionAddress = - // regionServerAddress.getInetSocketAddress().getAddress(); - // - // String regionLocation; - // - // try { - // regionLocation = reverseDNS(regionAddress); - // } catch (NamingException e) { - // LOG.error("Cannot resolve the host name for " + regionAddress + - // " because of " + e); - // regionLocation = regionServerAddress.getHostname(); - // } - - // String regionLocation = - // table.getRegionLocation(keys.getFirst()[i]).getHostname(); + regionNames[i] = regionLoc.getRegionInfo().getRegionNameAsString(); LOG.debug("***** " + regionLocation); @@ -219,7 +203,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { regionLocation = regionServerAddress.getHostname(); } - byte[] sStart = (startRow == HConstants.EMPTY_START_ROW + String regionName = cRegion.getRegionInfo().getRegionNameAsString(); + + byte[] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart : startRow); byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW @@ -234,7 +220,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { .compareTo(stopRow, rStop) >= 0)), rStop.length)); HBaseTableSplitGranular split = new HBaseTableSplitGranular( - table.getTableName(), sStart, sStop, regionLocation, + table.getTableName(), sStart, sStop, regionLocation, regionName, SourceMode.SCAN_RANGE, useSalt); split.setEndRowInclusive(currentRegion == maxRegions); @@ -270,7 +256,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { HBaseTableSplitGranular split = new HBaseTableSplitGranular( table.getTableName(), pair.getFirst(), - pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, + pair.getSecond(), regions[i], regionNames[i], SourceMode.SCAN_RANGE, useSalt); split.setEndRowInclusive(true); @@ -361,7 +347,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { regions[i], regionKeyList)); HBaseTableSplitGranular split = new HBaseTableSplitGranular( - table.getTableName(), regionKeyList, versions, regions[i], + table.getTableName(), regionKeyList, versions, regions[i], regionNames[i], SourceMode.GET_LIST, useSalt); splits.add(split); } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index eadb57e..8185b22 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -9,7 +9,6 @@ import org.apache.hadoop.mapred.*; import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.List; /** * Created with IntelliJ IDEA. @@ -28,12 +27,17 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { granular.configure(job); HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits); - HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits ); + HBaseTableSplitRegional[] splits = convertToRegionalSplitArray(gSplits); if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL"); - LOG.info("GRANULAR => " + gSplits); - LOG.info("REGIONAL => " + splits); + for(HBaseTableSplitGranular g : gSplits) { + LOG.info("GRANULAR => " + g); + } + + for(HBaseTableSplitRegional r : splits ) { + LOG.info("REGIONAL => " + r); + } return splits; } @@ -43,10 +47,10 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { if (!(inputSplit instanceof HBaseTableSplitRegional)) throw new IOException("Table Split is not type HBaseTableSplitRegional"); - LOG.info("REGIONAL SPLIT -> " + inputSplit); - HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit; + LOG.info("REGIONAL SPLIT -> " + tSplit); + HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional(); HBaseConfigUtils.setRecordReaderParms(trr, tSplit); @@ -60,7 +64,7 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { return trr; } - private HBaseTableSplitRegional[] convertToMultiSplitArray( + private HBaseTableSplitRegional[] convertToRegionalSplitArray( HBaseTableSplitGranular[] splits) throws IOException { if (splits == null) @@ -70,16 +74,16 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { for (HBaseTableSplitGranular hbt : splits) { HBaseTableSplitRegional mis = null; - if (regionSplits.containsKey(hbt.getRegionLocation())) { - mis = regionSplits.get(hbt.getRegionLocation()); + if (regionSplits.containsKey(hbt.getRegionName())) { + mis = regionSplits.get(hbt.getRegionName()); } else { - regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional( - hbt.getRegionLocation())); - mis = regionSplits.get(hbt.getRegionLocation()); + regionSplits.put(hbt.getRegionName(), new HBaseTableSplitRegional( + hbt.getRegionLocation(), hbt.getRegionName())); + mis = regionSplits.get(hbt.getRegionName()); } mis.addSplit(hbt); - regionSplits.put(hbt.getRegionLocation(), mis); + regionSplits.put(hbt.getRegionName(), mis); } // for(String region : regionSplits.keySet() ) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index 37858ad..e0d0cbe 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -24,6 +24,7 @@ public abstract class HBaseRecordReaderBase implements RecordReader<ImmutableBytesWritable, Result> { protected TreeSet<String> keyList; + protected long initialNoOfKeys = 0; protected HBaseConstants.SourceMode sourceMode; protected boolean endRowInclusive = true; protected int versions = 1; @@ -39,6 +40,7 @@ public abstract class HBaseRecordReaderBase implements protected boolean logScannerActivity = false; protected int logPerRowCount = 100; + protected int noOfLogCount = 0; @Override public String toString() { @@ -107,6 +109,7 @@ public abstract class HBaseRecordReaderBase implements public void setKeyList(TreeSet<String> keyList) { this.keyList = keyList; + initialNoOfKeys = (this.keyList == null) ? 0 : this.keyList.size(); } public void setVersions(int versions) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 6c28d9f..2155d99 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; +import org.jruby.javasupport.util.RuntimeHelpers; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @@ -37,7 +38,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private byte[] lastSuccessfulRow; private ResultScanner scanner; private long timestamp; - private int rowcount; + private int rowcount = 0; @Override public String toString() { @@ -51,6 +52,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { return sbuf.toString(); } + private final int scanCaching = 1000; + /** * Restart from survivable exceptions by creating a new scanner. @@ -67,7 +70,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } else { @@ -76,6 +80,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] { 0 }) : endRow)); TableInputFormat.addColumns(scan, trrInputColumns); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -86,6 +92,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { Scan scan = new Scan(firstRow); TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -150,15 +158,37 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @Override public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; + switch(sourceMode) { + case GET_LIST: + long posGet = (keyList != null ) ? 0 : initialNoOfKeys - keyList.size() ; + return posGet; + + case SCAN_ALL: + case SCAN_RANGE: + long posScan = (noOfLogCount * logPerRowCount) + rowcount; + return posScan; + + default: + return 0; + } } @Override public float getProgress() { // Depends on the total number of tuples and getPos - return 0; + switch(sourceMode) { + case GET_LIST: + float progGet = ((initialNoOfKeys == 0) ? 0 : (getPos() / initialNoOfKeys)); + return progGet; + + case SCAN_ALL: + case SCAN_RANGE: + float progScan = (getPos() / (getPos() + 10000)); + return progScan; + + default: + return 0; + } } /** @@ -181,15 +211,18 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { try { try { result = this.scanner.next(); - if (logScannerActivity) { rowcount++; if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + timestamp = now; + noOfLogCount ++; + rowcount = 0; + } + + if (logScannerActivity) { long now = System.currentTimeMillis(); LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } + + rowcount + " rows"); } } catch (IOException e) { // try to handle all IOExceptions by restarting diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index e2b1ec8..5d2b613 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -90,6 +90,7 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { boolean nextFlag = currentRecordReader.next(ibw, result); while(nextFlag == false && multiSplit.hasMoreSplits() ) { + totalPos += currentRecordReader.getPos(); setNextSplit(); nextFlag = currentRecordReader.next(ibw, result); } @@ -97,6 +98,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { return nextFlag; } + long totalPos = 0; + @Override public ImmutableBytesWritable createKey() { return currentRecordReader.createKey(); @@ -109,7 +112,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { @Override public long getPos() throws IOException { - return currentRecordReader.getPos(); + long pos = totalPos + currentRecordReader.getPos(); + return pos; } @Override @@ -119,6 +123,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { @Override public float getProgress() throws IOException { - return currentRecordReader.getProgress(); + // ( current count + percent of next count ) / max count + float prog = ((multiSplit.getCurrSplitCount() + currentRecordReader.getProgress()) / multiSplit.getLength()); + return prog; } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 2f6e7b5..e24771f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -29,6 +29,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, protected byte[] m_startRow = null; protected byte[] m_endRow = null; protected String m_regionLocation = null; + protected String m_regionName = null; protected TreeSet<String> m_keyList = null; protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY; protected boolean m_endRowInclusive = true; @@ -90,6 +91,10 @@ public abstract class HBaseTableSplitBase implements InputSplit, return new String[] { this.m_regionLocation }; } + public String getRegionName() { + return this.m_regionName; + } + public void copy(HBaseTableSplitBase that) { this.m_endRow = that.m_endRow; @@ -100,6 +105,8 @@ public abstract class HBaseTableSplitBase implements InputSplit, this.m_tableName = that.m_tableName; this.m_useSalt = that.m_useSalt; this.m_versions = that.m_versions; + this.m_regionLocation = that.m_regionLocation; + this.m_regionName = that.m_regionName; } @Override @@ -108,6 +115,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, this.m_tableName = Bytes.readByteArray(in); this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_regionName = Bytes.toString(Bytes.readByteArray(in)); this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes .readByteArray(in))); this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); @@ -140,6 +148,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, Bytes.writeByteArray(out, this.m_tableName); Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionName)); Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index 4de7153..a266411 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -21,7 +21,7 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase { /** default constructor */ public HBaseTableSplitGranular() { this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false); + HConstants.EMPTY_BYTE_ARRAY, "", "", HBaseConstants.SourceMode.EMPTY, false); } /** @@ -33,24 +33,26 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase { * @param location */ public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow, - final byte[] endRow, final String location, + final byte[] endRow, final String location, final String regionName, final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { this.m_tableName = tableName; this.m_startRow = startRow; this.m_endRow = endRow; this.m_regionLocation = location; + this.m_regionName = regionName; this.m_sourceMode = sourceMode; this.m_useSalt = useSalt; } public HBaseTableSplitGranular(final byte[] tableName, - final TreeSet<String> keyList, int versions, final String location, + final TreeSet<String> keyList, int versions, final String location, final String regionName, final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { this.m_tableName = tableName; this.m_keyList = keyList; this.m_versions = versions; this.m_sourceMode = sourceMode; this.m_regionLocation = location; + this.m_regionName = regionName; this.m_useSalt = useSalt; } @@ -67,8 +69,8 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase { public String toString() { return String .format( - "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", - Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, + "Table Name (%s) Region Location (%s) Name (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_regionName, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index 1ebfa3d..ad5f78b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -26,8 +26,9 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase { } - public HBaseTableSplitRegional(String regionLocation) { + public HBaseTableSplitRegional(String regionLocation, String regionName) { this.m_regionLocation = regionLocation; + this.m_regionName = regionName; } @Override @@ -70,6 +71,8 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase { str.append(super.toString()); + str.append(" REGIONAL => Region Location (" + m_regionLocation + ") Name (" + m_regionName + ")" ); + str.append(" GRANULAR = > "); for (HBaseTableSplitGranular hbt : splits) { @@ -114,14 +117,20 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase { } private Iterator<HBaseTableSplitGranular> splitIterator = null; + private int currSplitCount = 0; public HBaseTableSplitGranular getNextSplit() { splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; if( splitIterator.hasNext() ) { + currSplitCount ++; return splitIterator.next(); } else { return null; } } + + public int getCurrSplitCount() { + return currSplitCount; + } }
\ No newline at end of file |