aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-09-30 15:24:09 +0100
committerChandan Rajah <crajah@parallelai.com>2013-09-30 15:24:09 +0100
commitf6954fc81a6eaf60ca6088c11b51f86e48733be7 (patch)
tree79aac956c03c80a0488d93a2fd7aca5241b1c465 /src
parentc550ab1c80b384e164979fcbe01f34a8308a8b95 (diff)
downloadSpyGlass-4.1.1.tar.gz
SpyGlass-4.1.1.zip
1. Created a mapper per region as ooposed to mapper per region server4.1.1
2. Added progress indicators 3. Better logging
Diffstat (limited to 'src')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java38
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java30
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java55
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java10
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java9
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java12
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java11
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala109
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala12
10 files changed, 181 insertions, 108 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 929e9d8..64effc9 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -75,7 +75,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(
- Addressing.HOSTNAME_PORT_SEPARATOR)[0],
+ Addressing.HOSTNAME_PORT_SEPARATOR)[0], regLoc.getRegionInfo().getRegionNameAsString(),
SourceMode.EMPTY, false);
splits.add(split);
@@ -100,6 +100,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
byte[][] regStartKeys = keys.getFirst();
byte[][] regStopKeys = keys.getSecond();
String[] regions = new String[regStartKeys.length];
+ String[] regionNames = new String[regStartKeys.length];
for (int i = 0; i < regStartKeys.length; i++) {
minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0)
@@ -109,10 +110,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
&& (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i]
: maxKey;
- HServerAddress regionServerAddress = table.getRegionLocation(
- keys.getFirst()[i]).getServerAddress();
- InetAddress regionAddress = regionServerAddress.getInetSocketAddress()
- .getAddress();
+ HRegionLocation regionLoc = table.getRegionLocation(keys.getFirst()[i]);
+ HServerAddress regionServerAddress = regionLoc.getServerAddress();
+ InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
@@ -122,23 +122,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
regionLocation = regionServerAddress.getHostname();
}
- // HServerAddress regionServerAddress =
- // table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
- // InetAddress regionAddress =
- // regionServerAddress.getInetSocketAddress().getAddress();
- //
- // String regionLocation;
- //
- // try {
- // regionLocation = reverseDNS(regionAddress);
- // } catch (NamingException e) {
- // LOG.error("Cannot resolve the host name for " + regionAddress +
- // " because of " + e);
- // regionLocation = regionServerAddress.getHostname();
- // }
-
- // String regionLocation =
- // table.getRegionLocation(keys.getFirst()[i]).getHostname();
+ regionNames[i] = regionLoc.getRegionInfo().getRegionNameAsString();
LOG.debug("***** " + regionLocation);
@@ -219,7 +203,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
regionLocation = regionServerAddress.getHostname();
}
- byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
+ String regionName = cRegion.getRegionInfo().getRegionNameAsString();
+
+ byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
|| (Bytes.compareTo(startRow, rStart) <= 0) ? rStart
: startRow);
byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW
@@ -234,7 +220,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
.compareTo(stopRow, rStop) >= 0)), rStop.length));
HBaseTableSplitGranular split = new HBaseTableSplitGranular(
- table.getTableName(), sStart, sStop, regionLocation,
+ table.getTableName(), sStart, sStop, regionLocation, regionName,
SourceMode.SCAN_RANGE, useSalt);
split.setEndRowInclusive(currentRegion == maxRegions);
@@ -270,7 +256,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
HBaseTableSplitGranular split = new HBaseTableSplitGranular(
table.getTableName(), pair.getFirst(),
- pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
+ pair.getSecond(), regions[i], regionNames[i], SourceMode.SCAN_RANGE,
useSalt);
split.setEndRowInclusive(true);
@@ -361,7 +347,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
regions[i], regionKeyList));
HBaseTableSplitGranular split = new HBaseTableSplitGranular(
- table.getTableName(), regionKeyList, versions, regions[i],
+ table.getTableName(), regionKeyList, versions, regions[i], regionNames[i],
SourceMode.GET_LIST, useSalt);
splits.add(split);
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
index eadb57e..8185b22 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
@@ -9,7 +9,6 @@ import org.apache.hadoop.mapred.*;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
/**
* Created with IntelliJ IDEA.
@@ -28,12 +27,17 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
granular.configure(job);
HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits);
- HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits );
+ HBaseTableSplitRegional[] splits = convertToRegionalSplitArray(gSplits);
if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL");
- LOG.info("GRANULAR => " + gSplits);
- LOG.info("REGIONAL => " + splits);
+ for(HBaseTableSplitGranular g : gSplits) {
+ LOG.info("GRANULAR => " + g);
+ }
+
+ for(HBaseTableSplitRegional r : splits ) {
+ LOG.info("REGIONAL => " + r);
+ }
return splits;
}
@@ -43,10 +47,10 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
if (!(inputSplit instanceof HBaseTableSplitRegional))
throw new IOException("Table Split is not type HBaseTableSplitRegional");
- LOG.info("REGIONAL SPLIT -> " + inputSplit);
-
HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit;
+ LOG.info("REGIONAL SPLIT -> " + tSplit);
+
HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional();
HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
@@ -60,7 +64,7 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
return trr;
}
- private HBaseTableSplitRegional[] convertToMultiSplitArray(
+ private HBaseTableSplitRegional[] convertToRegionalSplitArray(
HBaseTableSplitGranular[] splits) throws IOException {
if (splits == null)
@@ -70,16 +74,16 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase {
for (HBaseTableSplitGranular hbt : splits) {
HBaseTableSplitRegional mis = null;
- if (regionSplits.containsKey(hbt.getRegionLocation())) {
- mis = regionSplits.get(hbt.getRegionLocation());
+ if (regionSplits.containsKey(hbt.getRegionName())) {
+ mis = regionSplits.get(hbt.getRegionName());
} else {
- regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional(
- hbt.getRegionLocation()));
- mis = regionSplits.get(hbt.getRegionLocation());
+ regionSplits.put(hbt.getRegionName(), new HBaseTableSplitRegional(
+ hbt.getRegionLocation(), hbt.getRegionName()));
+ mis = regionSplits.get(hbt.getRegionName());
}
mis.addSplit(hbt);
- regionSplits.put(hbt.getRegionLocation(), mis);
+ regionSplits.put(hbt.getRegionName(), mis);
}
// for(String region : regionSplits.keySet() ) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
index 37858ad..e0d0cbe 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
@@ -24,6 +24,7 @@ public abstract class HBaseRecordReaderBase implements
RecordReader<ImmutableBytesWritable, Result> {
protected TreeSet<String> keyList;
+ protected long initialNoOfKeys = 0;
protected HBaseConstants.SourceMode sourceMode;
protected boolean endRowInclusive = true;
protected int versions = 1;
@@ -39,6 +40,7 @@ public abstract class HBaseRecordReaderBase implements
protected boolean logScannerActivity = false;
protected int logPerRowCount = 100;
+ protected int noOfLogCount = 0;
@Override
public String toString() {
@@ -107,6 +109,7 @@ public abstract class HBaseRecordReaderBase implements
public void setKeyList(TreeSet<String> keyList) {
this.keyList = keyList;
+ initialNoOfKeys = (this.keyList == null) ? 0 : this.keyList.size();
}
public void setVersions(int versions) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index 6c28d9f..2155d99 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
+import org.jruby.javasupport.util.RuntimeHelpers;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@@ -37,7 +38,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
private byte[] lastSuccessfulRow;
private ResultScanner scanner;
private long timestamp;
- private int rowcount;
+ private int rowcount = 0;
@Override
public String toString() {
@@ -51,6 +52,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
return sbuf.toString();
}
+ private final int scanCaching = 1000;
+
/**
* Restart from survivable exceptions by creating a new scanner.
@@ -67,7 +70,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
- scan.setCacheBlocks(false);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
} else {
@@ -76,6 +80,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
new byte[] { 0 }) : endRow));
TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -86,6 +92,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
Scan scan = new Scan(firstRow);
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(true);
+ scan.setCaching(scanCaching);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -150,15 +158,37 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@Override
public long getPos() {
- // This should be the ordinal tuple in the range;
- // not clear how to calculate...
- return 0;
+ switch(sourceMode) {
+ case GET_LIST:
+ long posGet = (keyList != null ) ? 0 : initialNoOfKeys - keyList.size() ;
+ return posGet;
+
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ long posScan = (noOfLogCount * logPerRowCount) + rowcount;
+ return posScan;
+
+ default:
+ return 0;
+ }
}
@Override
public float getProgress() {
// Depends on the total number of tuples and getPos
- return 0;
+ switch(sourceMode) {
+ case GET_LIST:
+ float progGet = ((initialNoOfKeys == 0) ? 0 : (getPos() / initialNoOfKeys));
+ return progGet;
+
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ float progScan = (getPos() / (getPos() + 10000));
+ return progScan;
+
+ default:
+ return 0;
+ }
}
/**
@@ -181,15 +211,18 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
try {
try {
result = this.scanner.next();
- if (logScannerActivity) {
rowcount++;
if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ timestamp = now;
+ noOfLogCount ++;
+ rowcount = 0;
+ }
+
+ if (logScannerActivity) {
long now = System.currentTimeMillis();
LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
- + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
+ + rowcount + " rows");
}
} catch (IOException e) {
// try to handle all IOExceptions by restarting
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
index e2b1ec8..5d2b613 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
@@ -90,6 +90,7 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
boolean nextFlag = currentRecordReader.next(ibw, result);
while(nextFlag == false && multiSplit.hasMoreSplits() ) {
+ totalPos += currentRecordReader.getPos();
setNextSplit();
nextFlag = currentRecordReader.next(ibw, result);
}
@@ -97,6 +98,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
return nextFlag;
}
+ long totalPos = 0;
+
@Override
public ImmutableBytesWritable createKey() {
return currentRecordReader.createKey();
@@ -109,7 +112,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
@Override
public long getPos() throws IOException {
- return currentRecordReader.getPos();
+ long pos = totalPos + currentRecordReader.getPos();
+ return pos;
}
@Override
@@ -119,6 +123,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
@Override
public float getProgress() throws IOException {
- return currentRecordReader.getProgress();
+ // ( current count + percent of next count ) / max count
+ float prog = ((multiSplit.getCurrSplitCount() + currentRecordReader.getProgress()) / multiSplit.getLength());
+ return prog;
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
index 2f6e7b5..e24771f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
@@ -29,6 +29,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
protected byte[] m_startRow = null;
protected byte[] m_endRow = null;
protected String m_regionLocation = null;
+ protected String m_regionName = null;
protected TreeSet<String> m_keyList = null;
protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY;
protected boolean m_endRowInclusive = true;
@@ -90,6 +91,10 @@ public abstract class HBaseTableSplitBase implements InputSplit,
return new String[] { this.m_regionLocation };
}
+ public String getRegionName() {
+ return this.m_regionName;
+ }
+
public void copy(HBaseTableSplitBase that) {
this.m_endRow = that.m_endRow;
@@ -100,6 +105,8 @@ public abstract class HBaseTableSplitBase implements InputSplit,
this.m_tableName = that.m_tableName;
this.m_useSalt = that.m_useSalt;
this.m_versions = that.m_versions;
+ this.m_regionLocation = that.m_regionLocation;
+ this.m_regionName = that.m_regionName;
}
@Override
@@ -108,6 +115,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
this.m_tableName = Bytes.readByteArray(in);
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ this.m_regionName = Bytes.toString(Bytes.readByteArray(in));
this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes
.readByteArray(in)));
this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
@@ -140,6 +148,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
Bytes.writeByteArray(out, this.m_tableName);
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionName));
Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
index 4de7153..a266411 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
@@ -21,7 +21,7 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase {
/** default constructor */
public HBaseTableSplitGranular() {
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false);
+ HConstants.EMPTY_BYTE_ARRAY, "", "", HBaseConstants.SourceMode.EMPTY, false);
}
/**
@@ -33,24 +33,26 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase {
* @param location
*/
public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow,
- final byte[] endRow, final String location,
+ final byte[] endRow, final String location, final String regionName,
final HBaseConstants.SourceMode sourceMode, final boolean useSalt) {
this.m_tableName = tableName;
this.m_startRow = startRow;
this.m_endRow = endRow;
this.m_regionLocation = location;
+ this.m_regionName = regionName;
this.m_sourceMode = sourceMode;
this.m_useSalt = useSalt;
}
public HBaseTableSplitGranular(final byte[] tableName,
- final TreeSet<String> keyList, int versions, final String location,
+ final TreeSet<String> keyList, int versions, final String location, final String regionName,
final HBaseConstants.SourceMode sourceMode, final boolean useSalt) {
this.m_tableName = tableName;
this.m_keyList = keyList;
this.m_versions = versions;
this.m_sourceMode = sourceMode;
this.m_regionLocation = location;
+ this.m_regionName = regionName;
this.m_useSalt = useSalt;
}
@@ -67,8 +69,8 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase {
public String toString() {
return String
.format(
- "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
- Bytes.toString(m_tableName), m_regionLocation, m_sourceMode,
+ "Table Name (%s) Region Location (%s) Name (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_regionName, m_sourceMode,
Bytes.toString(m_startRow), Bytes.toString(m_endRow),
(m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions,
m_useSalt);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
index 1ebfa3d..ad5f78b 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
@@ -26,8 +26,9 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase {
}
- public HBaseTableSplitRegional(String regionLocation) {
+ public HBaseTableSplitRegional(String regionLocation, String regionName) {
this.m_regionLocation = regionLocation;
+ this.m_regionName = regionName;
}
@Override
@@ -70,6 +71,8 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase {
str.append(super.toString());
+ str.append(" REGIONAL => Region Location (" + m_regionLocation + ") Name (" + m_regionName + ")" );
+
str.append(" GRANULAR = > ");
for (HBaseTableSplitGranular hbt : splits) {
@@ -114,14 +117,20 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase {
}
private Iterator<HBaseTableSplitGranular> splitIterator = null;
+ private int currSplitCount = 0;
public HBaseTableSplitGranular getNextSplit() {
splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator;
if( splitIterator.hasNext() ) {
+ currSplitCount ++;
return splitIterator.next();
} else {
return null;
}
}
+
+ public int getCurrSplitCount() {
+ return currSplitCount;
+ }
} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
index d75ff7b..fb91f65 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
@@ -3,7 +3,7 @@ package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
-import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine}
+import com.twitter.scalding._
import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
import cascading.tuple.Fields
import org.apache.log4j.{Logger, Level}
@@ -11,6 +11,9 @@ import cascading.tap.SinkMode
import cascading.pipe.Pipe
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.hbase.HBaseSource
+import com.twitter.scalding.IterableSource
+import com.twitter.scalding.TextLine
class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions {
@@ -24,26 +27,37 @@ class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConver
val quorum = args("quorum")
- val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val stt = args("start.value").toInt
+ val stp = args("stop.value").toInt
- def toIBW(pipe: Pipe, f: Fields): Pipe = {
- asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String =>
- Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
- }}
+ loadRanges(stt, stp)
+
+ def loadRanges(stt: Int, stp: Int) {
+ def toIBW(pipe: Pipe, f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
}
- }
+ val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
- val input = IterableSource(inVals, TABLE_SCHEMA)
- .read
- .write(TextLine("saltTesting/Inputs"))
+// val input = IterableSource(inVals, TABLE_SCHEMA)
+// .read
+// .write(TextLine("saltTesting/Inputs"))
+//
+ val fromSource = new IterableSource(inVals, TABLE_SCHEMA).read.name("source_%s_%s".format(stt, stp))
- val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA)
- .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE ))
+ val toSource = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )
+
+ toIBW(fromSource, TABLE_SCHEMA)
+ .write(toSource)
+
+ }
}
class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
@@ -58,23 +72,23 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val quorum = args("quorum")
- val sttKey = "01728"
- val stpKey = "03725"
- val sttKeyP = "8_01728"
- val stpKeyP = "5_03725"
- val listKey = List("01681", "01456")
- val listKeyP = List("0_01681", "6_01456")
- val noSttKey = "9999990"
- val noStpKey = "9999999"
- val noSttKeyP = "9_9999990"
- val noStpKeyP = "9_9999999"
- val noListKey = List("0123456", "6543210")
- val noListKeyP = List("6_0123456", "0_6543210")
+ val sttKey = "0000001728"
+ val stpKey = "0000003725"
+ val sttKeyP = "8_0000001728"
+ val stpKeyP = "5_0000003725"
+ val listKey = List("0000001681", "0000001456")
+ val listKeyP = List("0_0000001681", "6_0000001456")
+ val noSttKey = "999999999990"
+ val noStpKey = "999999999999"
+ val noSttKeyP = "9_999999999990"
+ val noStpKeyP = "9_999999999999"
+ val noListKey = List("000000123456", "000006543210")
+ val noListKeyP = List("6_000000123456", "0_000006543210")
val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR
val testName01 = "Scan All with NO useSalt"
- val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
@@ -99,7 +113,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
.groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
val testName03 = "Scan Range with NO useSalt"
- val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
@@ -125,7 +139,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val testName05 = "Get List with NO useSalt"
- val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
@@ -288,21 +302,28 @@ class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeCon
val quorum = args("quorum")
- val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))
+ val stt = args("start.value").toInt
+ val stp = args("stop.value").toInt
+
+ delRanges(stt, stp)
- def toIBW(pipe: Pipe, f: Fields): Pipe = {
- asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String =>
- Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
- }}
+ def delRanges(stt: Int, stp: Int) {
+ val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x)))
+
+ def toIBW(pipe: Pipe, f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
}
- }
- val input = IterableSource(inVals, TABLE_SCHEMA).read
+ val input = IterableSource(inVals, TABLE_SCHEMA).read
- val eraser = toIBW(input, TABLE_SCHEMA)
- .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
+ val eraser = toIBW(input, TABLE_SCHEMA)
+ .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
+ }
} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
index 17bc873..8d5c4ec 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala
@@ -32,33 +32,33 @@ object HBaseSaltTesterRunner extends App {
if( make ) {
- JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName,
+ JobRunner.main((List(classOf[HBaseSaltTestSetup].getName,
"--hdfs",
"--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
"--quorum", quorum,
"--debug", isDebug.toString
- ))
+ ) ::: mArgs.toList).toArray)
}
if( test ) {
- JobRunner.main(Array(classOf[HBaseSaltTester].getName,
+ JobRunner.main((List(classOf[HBaseSaltTester].getName,
"--hdfs",
"--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
"--quorum", quorum,
"--debug", isDebug.toString,
"--regional", mArgs.getOrElse("regional", "false")
- ))
+ )::: mArgs.toList).toArray)
}
if( delete ) {
- JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName,
+ JobRunner.main((List(classOf[HBaseSaltTestShutdown].getName,
"--hdfs",
"--app.conf.path", appPath,
"--job.lib.path", jobLibPath,
"--quorum", quorum,
"--debug", isDebug.toString
- ))
+ )::: mArgs.toList).toArray)
}
} \ No newline at end of file