diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-09-30 15:24:09 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-09-30 15:24:09 +0100 |
commit | f6954fc81a6eaf60ca6088c11b51f86e48733be7 (patch) | |
tree | 79aac956c03c80a0488d93a2fd7aca5241b1c465 /src/main | |
parent | c550ab1c80b384e164979fcbe01f34a8308a8b95 (diff) | |
download | SpyGlass-f6954fc81a6eaf60ca6088c11b51f86e48733be7.tar.gz SpyGlass-f6954fc81a6eaf60ca6088c11b51f86e48733be7.zip |
1. Created a mapper per region as ooposed to mapper per region server4.1.1
2. Added progress indicators
3. Better logging
Diffstat (limited to 'src/main')
10 files changed, 181 insertions, 108 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 929e9d8..64effc9 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -75,7 +75,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split( - Addressing.HOSTNAME_PORT_SEPARATOR)[0], + Addressing.HOSTNAME_PORT_SEPARATOR)[0], regLoc.getRegionInfo().getRegionNameAsString(), SourceMode.EMPTY, false); splits.add(split); @@ -100,6 +100,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { byte[][] regStartKeys = keys.getFirst(); byte[][] regStopKeys = keys.getSecond(); String[] regions = new String[regStartKeys.length]; + String[] regionNames = new String[regStartKeys.length]; for (int i = 0; i < regStartKeys.length; i++) { minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) @@ -109,10 +110,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] : maxKey; - HServerAddress regionServerAddress = table.getRegionLocation( - keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = regionServerAddress.getInetSocketAddress() - .getAddress(); + HRegionLocation regionLoc = table.getRegionLocation(keys.getFirst()[i]); + HServerAddress regionServerAddress = regionLoc.getServerAddress(); + InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); @@ -122,23 +122,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { regionLocation = regionServerAddress.getHostname(); } - // HServerAddress regionServerAddress = - // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - // InetAddress regionAddress = - // regionServerAddress.getInetSocketAddress().getAddress(); - // - // String regionLocation; - // - // try { - // regionLocation = reverseDNS(regionAddress); - // } catch (NamingException e) { - // LOG.error("Cannot resolve the host name for " + regionAddress + - // " because of " + e); - // regionLocation = regionServerAddress.getHostname(); - // } - - // String regionLocation = - // table.getRegionLocation(keys.getFirst()[i]).getHostname(); + regionNames[i] = regionLoc.getRegionInfo().getRegionNameAsString(); LOG.debug("***** " + regionLocation); @@ -219,7 +203,9 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { regionLocation = regionServerAddress.getHostname(); } - byte[] sStart = (startRow == HConstants.EMPTY_START_ROW + String regionName = cRegion.getRegionInfo().getRegionNameAsString(); + + byte[] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart : startRow); byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW @@ -234,7 +220,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { .compareTo(stopRow, rStop) >= 0)), rStop.length)); HBaseTableSplitGranular split = new HBaseTableSplitGranular( - table.getTableName(), sStart, sStop, regionLocation, + table.getTableName(), sStart, sStop, regionLocation, regionName, SourceMode.SCAN_RANGE, useSalt); split.setEndRowInclusive(currentRegion == maxRegions); @@ -270,7 +256,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { HBaseTableSplitGranular split = new HBaseTableSplitGranular( table.getTableName(), pair.getFirst(), - pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, + pair.getSecond(), regions[i], regionNames[i], SourceMode.SCAN_RANGE, useSalt); split.setEndRowInclusive(true); @@ -361,7 +347,7 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { regions[i], regionKeyList)); HBaseTableSplitGranular split = new HBaseTableSplitGranular( - table.getTableName(), regionKeyList, versions, regions[i], + table.getTableName(), regionKeyList, versions, regions[i], regionNames[i], SourceMode.GET_LIST, useSalt); splits.add(split); } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index eadb57e..8185b22 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -9,7 +9,6 @@ import org.apache.hadoop.mapred.*; import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.List; /** * Created with IntelliJ IDEA. @@ -28,12 +27,17 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { granular.configure(job); HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits); - HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits ); + HBaseTableSplitRegional[] splits = convertToRegionalSplitArray(gSplits); if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL"); - LOG.info("GRANULAR => " + gSplits); - LOG.info("REGIONAL => " + splits); + for(HBaseTableSplitGranular g : gSplits) { + LOG.info("GRANULAR => " + g); + } + + for(HBaseTableSplitRegional r : splits ) { + LOG.info("REGIONAL => " + r); + } return splits; } @@ -43,10 +47,10 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { if (!(inputSplit instanceof HBaseTableSplitRegional)) throw new IOException("Table Split is not type HBaseTableSplitRegional"); - LOG.info("REGIONAL SPLIT -> " + inputSplit); - HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit; + LOG.info("REGIONAL SPLIT -> " + tSplit); + HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional(); HBaseConfigUtils.setRecordReaderParms(trr, tSplit); @@ -60,7 +64,7 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { return trr; } - private HBaseTableSplitRegional[] convertToMultiSplitArray( + private HBaseTableSplitRegional[] convertToRegionalSplitArray( HBaseTableSplitGranular[] splits) throws IOException { if (splits == null) @@ -70,16 +74,16 @@ public class HBaseInputFormatRegional extends HBaseInputFormatBase { for (HBaseTableSplitGranular hbt : splits) { HBaseTableSplitRegional mis = null; - if (regionSplits.containsKey(hbt.getRegionLocation())) { - mis = regionSplits.get(hbt.getRegionLocation()); + if (regionSplits.containsKey(hbt.getRegionName())) { + mis = regionSplits.get(hbt.getRegionName()); } else { - regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional( - hbt.getRegionLocation())); - mis = regionSplits.get(hbt.getRegionLocation()); + regionSplits.put(hbt.getRegionName(), new HBaseTableSplitRegional( + hbt.getRegionLocation(), hbt.getRegionName())); + mis = regionSplits.get(hbt.getRegionName()); } mis.addSplit(hbt); - regionSplits.put(hbt.getRegionLocation(), mis); + regionSplits.put(hbt.getRegionName(), mis); } // for(String region : regionSplits.keySet() ) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index 37858ad..e0d0cbe 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -24,6 +24,7 @@ public abstract class HBaseRecordReaderBase implements RecordReader<ImmutableBytesWritable, Result> { protected TreeSet<String> keyList; + protected long initialNoOfKeys = 0; protected HBaseConstants.SourceMode sourceMode; protected boolean endRowInclusive = true; protected int versions = 1; @@ -39,6 +40,7 @@ public abstract class HBaseRecordReaderBase implements protected boolean logScannerActivity = false; protected int logPerRowCount = 100; + protected int noOfLogCount = 0; @Override public String toString() { @@ -107,6 +109,7 @@ public abstract class HBaseRecordReaderBase implements public void setKeyList(TreeSet<String> keyList) { this.keyList = keyList; + initialNoOfKeys = (this.keyList == null) ? 0 : this.keyList.size(); } public void setVersions(int versions) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 6c28d9f..2155d99 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; +import org.jruby.javasupport.util.RuntimeHelpers; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @@ -37,7 +38,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private byte[] lastSuccessfulRow; private ResultScanner scanner; private long timestamp; - private int rowcount; + private int rowcount = 0; @Override public String toString() { @@ -51,6 +52,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { return sbuf.toString(); } + private final int scanCaching = 1000; + /** * Restart from survivable exceptions by creating a new scanner. @@ -67,7 +70,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } else { @@ -76,6 +80,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] { 0 }) : endRow)); TableInputFormat.addColumns(scan, trrInputColumns); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -86,6 +92,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { Scan scan = new Scan(firstRow); TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); + scan.setCacheBlocks(true); + scan.setCaching(scanCaching); this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -150,15 +158,37 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @Override public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; + switch(sourceMode) { + case GET_LIST: + long posGet = (keyList != null ) ? 0 : initialNoOfKeys - keyList.size() ; + return posGet; + + case SCAN_ALL: + case SCAN_RANGE: + long posScan = (noOfLogCount * logPerRowCount) + rowcount; + return posScan; + + default: + return 0; + } } @Override public float getProgress() { // Depends on the total number of tuples and getPos - return 0; + switch(sourceMode) { + case GET_LIST: + float progGet = ((initialNoOfKeys == 0) ? 0 : (getPos() / initialNoOfKeys)); + return progGet; + + case SCAN_ALL: + case SCAN_RANGE: + float progScan = (getPos() / (getPos() + 10000)); + return progScan; + + default: + return 0; + } } /** @@ -181,15 +211,18 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { try { try { result = this.scanner.next(); - if (logScannerActivity) { rowcount++; if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + timestamp = now; + noOfLogCount ++; + rowcount = 0; + } + + if (logScannerActivity) { long now = System.currentTimeMillis(); LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } + + rowcount + " rows"); } } catch (IOException e) { // try to handle all IOExceptions by restarting diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index e2b1ec8..5d2b613 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -90,6 +90,7 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { boolean nextFlag = currentRecordReader.next(ibw, result); while(nextFlag == false && multiSplit.hasMoreSplits() ) { + totalPos += currentRecordReader.getPos(); setNextSplit(); nextFlag = currentRecordReader.next(ibw, result); } @@ -97,6 +98,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { return nextFlag; } + long totalPos = 0; + @Override public ImmutableBytesWritable createKey() { return currentRecordReader.createKey(); @@ -109,7 +112,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { @Override public long getPos() throws IOException { - return currentRecordReader.getPos(); + long pos = totalPos + currentRecordReader.getPos(); + return pos; } @Override @@ -119,6 +123,8 @@ public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { @Override public float getProgress() throws IOException { - return currentRecordReader.getProgress(); + // ( current count + percent of next count ) / max count + float prog = ((multiSplit.getCurrSplitCount() + currentRecordReader.getProgress()) / multiSplit.getLength()); + return prog; } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 2f6e7b5..e24771f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -29,6 +29,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, protected byte[] m_startRow = null; protected byte[] m_endRow = null; protected String m_regionLocation = null; + protected String m_regionName = null; protected TreeSet<String> m_keyList = null; protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY; protected boolean m_endRowInclusive = true; @@ -90,6 +91,10 @@ public abstract class HBaseTableSplitBase implements InputSplit, return new String[] { this.m_regionLocation }; } + public String getRegionName() { + return this.m_regionName; + } + public void copy(HBaseTableSplitBase that) { this.m_endRow = that.m_endRow; @@ -100,6 +105,8 @@ public abstract class HBaseTableSplitBase implements InputSplit, this.m_tableName = that.m_tableName; this.m_useSalt = that.m_useSalt; this.m_versions = that.m_versions; + this.m_regionLocation = that.m_regionLocation; + this.m_regionName = that.m_regionName; } @Override @@ -108,6 +115,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, this.m_tableName = Bytes.readByteArray(in); this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_regionName = Bytes.toString(Bytes.readByteArray(in)); this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes .readByteArray(in))); this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); @@ -140,6 +148,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, Bytes.writeByteArray(out, this.m_tableName); Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionName)); Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index 4de7153..a266411 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -21,7 +21,7 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase { /** default constructor */ public HBaseTableSplitGranular() { this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false); + HConstants.EMPTY_BYTE_ARRAY, "", "", HBaseConstants.SourceMode.EMPTY, false); } /** @@ -33,24 +33,26 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase { * @param location */ public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow, - final byte[] endRow, final String location, + final byte[] endRow, final String location, final String regionName, final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { this.m_tableName = tableName; this.m_startRow = startRow; this.m_endRow = endRow; this.m_regionLocation = location; + this.m_regionName = regionName; this.m_sourceMode = sourceMode; this.m_useSalt = useSalt; } public HBaseTableSplitGranular(final byte[] tableName, - final TreeSet<String> keyList, int versions, final String location, + final TreeSet<String> keyList, int versions, final String location, final String regionName, final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { this.m_tableName = tableName; this.m_keyList = keyList; this.m_versions = versions; this.m_sourceMode = sourceMode; this.m_regionLocation = location; + this.m_regionName = regionName; this.m_useSalt = useSalt; } @@ -67,8 +69,8 @@ public class HBaseTableSplitGranular extends HBaseTableSplitBase { public String toString() { return String .format( - "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", - Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, + "Table Name (%s) Region Location (%s) Name (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_regionName, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index 1ebfa3d..ad5f78b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -26,8 +26,9 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase { } - public HBaseTableSplitRegional(String regionLocation) { + public HBaseTableSplitRegional(String regionLocation, String regionName) { this.m_regionLocation = regionLocation; + this.m_regionName = regionName; } @Override @@ -70,6 +71,8 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase { str.append(super.toString()); + str.append(" REGIONAL => Region Location (" + m_regionLocation + ") Name (" + m_regionName + ")" ); + str.append(" GRANULAR = > "); for (HBaseTableSplitGranular hbt : splits) { @@ -114,14 +117,20 @@ public class HBaseTableSplitRegional extends HBaseTableSplitBase { } private Iterator<HBaseTableSplitGranular> splitIterator = null; + private int currSplitCount = 0; public HBaseTableSplitGranular getNextSplit() { splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; if( splitIterator.hasNext() ) { + currSplitCount ++; return splitIterator.next(); } else { return null; } } + + public int getCurrSplitCount() { + return currSplitCount; + } }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index d75ff7b..fb91f65 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -3,7 +3,7 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} -import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine} +import com.twitter.scalding._ import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} import cascading.tuple.Fields import org.apache.log4j.{Logger, Level} @@ -11,6 +11,9 @@ import cascading.tap.SinkMode import cascading.pipe.Pipe import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.IterableSource +import com.twitter.scalding.TextLine class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions { @@ -24,26 +27,37 @@ class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConver val quorum = args("quorum") - val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val stt = args("start.value").toInt + val stp = args("stop.value").toInt - def toIBW(pipe: Pipe, f: Fields): Pipe = { - asList(f) - .foldLeft(pipe){ (p, f) => { - p.map(f.toString -> f.toString){ from: String => - Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) - }} + loadRanges(stt, stp) + + def loadRanges(stt: Int, stp: Int) { + def toIBW(pipe: Pipe, f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } } - } + val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) - val input = IterableSource(inVals, TABLE_SCHEMA) - .read - .write(TextLine("saltTesting/Inputs")) +// val input = IterableSource(inVals, TABLE_SCHEMA) +// .read +// .write(TextLine("saltTesting/Inputs")) +// + val fromSource = new IterableSource(inVals, TABLE_SCHEMA).read.name("source_%s_%s".format(stt, stp)) - val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA) - .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE )) + val toSource = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE ) + + toIBW(fromSource, TABLE_SCHEMA) + .write(toSource) + + } } class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { @@ -58,23 +72,23 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val quorum = args("quorum") - val sttKey = "01728" - val stpKey = "03725" - val sttKeyP = "8_01728" - val stpKeyP = "5_03725" - val listKey = List("01681", "01456") - val listKeyP = List("0_01681", "6_01456") - val noSttKey = "9999990" - val noStpKey = "9999999" - val noSttKeyP = "9_9999990" - val noStpKeyP = "9_9999999" - val noListKey = List("0123456", "6543210") - val noListKeyP = List("6_0123456", "0_6543210") + val sttKey = "0000001728" + val stpKey = "0000003725" + val sttKeyP = "8_0000001728" + val stpKeyP = "5_0000003725" + val listKey = List("0000001681", "0000001456") + val listKeyP = List("0_0000001681", "6_0000001456") + val noSttKey = "999999999990" + val noStpKey = "999999999999" + val noSttKeyP = "9_999999999990" + val noStpKeyP = "9_999999999999" + val noListKey = List("000000123456", "000006543210") + val noListKeyP = List("6_000000123456", "0_000006543210") val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR val testName01 = "Scan All with NO useSalt" - val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), @@ -99,7 +113,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) val testName03 = "Scan Range with NO useSalt" - val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), @@ -125,7 +139,7 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val testName05 = "Get List with NO useSalt" - val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), @@ -288,21 +302,28 @@ class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeCon val quorum = args("quorum") - val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val stt = args("start.value").toInt + val stp = args("stop.value").toInt + + delRanges(stt, stp) - def toIBW(pipe: Pipe, f: Fields): Pipe = { - asList(f) - .foldLeft(pipe){ (p, f) => { - p.map(f.toString -> f.toString){ from: String => - Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) - }} + def delRanges(stt: Int, stp: Int) { + val inVals = (stt to stp).toList.map(x => ("" + (x%10) + "_" + "%010d".format(x), "" + (x%10) + "_" + "%010d".format(x), "%010d".format(x))) + + def toIBW(pipe: Pipe, f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } } - } - val input = IterableSource(inVals, TABLE_SCHEMA).read + val input = IterableSource(inVals, TABLE_SCHEMA).read - val eraser = toIBW(input, TABLE_SCHEMA) - .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) + val eraser = toIBW(input, TABLE_SCHEMA) + .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) + } }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index 17bc873..8d5c4ec 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -32,33 +32,33 @@ object HBaseSaltTesterRunner extends App { if( make ) { - JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, + JobRunner.main((List(classOf[HBaseSaltTestSetup].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum, "--debug", isDebug.toString - )) + ) ::: mArgs.toList).toArray) } if( test ) { - JobRunner.main(Array(classOf[HBaseSaltTester].getName, + JobRunner.main((List(classOf[HBaseSaltTester].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum, "--debug", isDebug.toString, "--regional", mArgs.getOrElse("regional", "false") - )) + )::: mArgs.toList).toArray) } if( delete ) { - JobRunner.main(Array(classOf[HBaseSaltTestShutdown].getName, + JobRunner.main((List(classOf[HBaseSaltTestShutdown].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum, "--debug", isDebug.toString - )) + )::: mArgs.toList).toArray) } }
\ No newline at end of file |