diff options
author | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 |
---|---|---|
committer | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 |
commit | 6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch) | |
tree | 5254682e3c3440f7c6954b23519459107b8a445e /src/main/java | |
parent | ea9c80374da846edf2a1634a42ccb932838ebd5b (diff) | |
download | SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip |
Added extensions to Read and Write mode.
Added support for key prefixes
Diffstat (limited to 'src/main/java')
9 files changed, 865 insertions, 269 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index b546107..5baf20e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -15,5 +15,8 @@ public class HBaseConstants { public static final String STOP_KEY = "hbase.%s.stopkey"; public static final String SOURCE_MODE = "hbase.%s.source.mode"; public static final String KEY_LIST = "hbase.%s.key.list"; + public static final String VERSIONS = "hbase.%s.versions"; + public static final String USE_SALT = "hbase.%s.use.salt"; + public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index f1f4fb7..aabdc5e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -5,8 +5,8 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; +import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -17,8 +17,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.filter.Filter; @@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - public class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -48,9 +49,9 @@ public class HBaseInputFormat private byte [][] inputColumns; private HTable table; - private HBaseRecordReader tableRecordReader; +// private HBaseRecordReader tableRecordReader; private Filter rowFilter; - private String tableName = ""; +// private String tableName = ""; private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); @@ -83,7 +84,8 @@ public class HBaseInputFormat List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); + splits.add(split); return splits.toArray(new HBaseTableSplit[splits.size()]); @@ -100,7 +102,7 @@ public class HBaseInputFormat byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; byte[] maxKey = keys.getSecond()[0]; - LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); byte [][] regStartKeys = keys.getFirst(); byte [][] regStopKeys = keys.getSecond(); @@ -144,29 +146,29 @@ public class HBaseInputFormat regions[i] = regionLocation; - LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); + LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); } byte[] startRow = HConstants.EMPTY_START_ROW; byte[] stopRow = HConstants.EMPTY_END_ROW; - LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); - LOG.info("SOURCE MODE is : " + sourceMode); + LOG.debug("SOURCE MODE is : " + sourceMode); switch( sourceMode ) { case SCAN_ALL: startRow = HConstants.EMPTY_START_ROW; stopRow = HConstants.EMPTY_END_ROW; - LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); + LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; case SCAN_RANGE: startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; - LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); + LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; } @@ -180,98 +182,110 @@ public class HBaseInputFormat List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for( HRegionLocation cRegion : validRegions ) { - byte [] rStart = cRegion.getRegionInfo().getStartKey(); - byte [] rStop = cRegion.getRegionInfo().getEndKey(); + if( ! useSalt ) { - HServerAddress regionServerAddress = cRegion.getServerAddress(); - InetAddress regionAddress = - regionServerAddress.getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress + - " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); - byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); - - LOG.info("".format("BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), - rStop.length - )); + List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), - sStart, - sStop, - regionLocation, - SourceMode.SCAN_RANGE - ); + int maxRegions = validRegions.size(); + int currentRegion = 1; - split.setEndRowInclusive( currentRegion == maxRegions ); - - currentRegion ++; - - LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), Bytes.toString(stopRow), - Bytes.toString(rStart), Bytes.toString(rStop), - Bytes.toString(sStart), - Bytes.toString(sStop), - cRegion.getHostnamePort(), split) ); + for( HRegionLocation cRegion : validRegions ) { + byte [] rStart = cRegion.getRegionInfo().getStartKey(); + byte [] rStop = cRegion.getRegionInfo().getEndKey(); + + HServerAddress regionServerAddress = cRegion.getServerAddress(); + InetAddress regionAddress = + regionServerAddress.getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); + byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); + + LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)", + (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), + (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), + rStop.length + )); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), + sStart, + sStop, + regionLocation, + SourceMode.SCAN_RANGE, useSalt + ); + + split.setEndRowInclusive( currentRegion == maxRegions ); + + currentRegion ++; + + LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", + Bytes.toString(startRow), Bytes.toString(stopRow), + Bytes.toString(rStart), Bytes.toString(rStop), + Bytes.toString(sStart), + Bytes.toString(sStop), + cRegion.getHostnamePort(), split) ); + + splits.add(split); + } + } else { + LOG.debug("Using SALT : " + useSalt ); - splits.add(split); + // Will return the start and the stop key with all possible prefixes. + for( int i = 0; i < regions.length; i++ ) { + Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); + + for( Pair<byte[], byte[]> pair : intervals ) { + LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), + pair.getFirst(), + pair.getSecond(), + regions[i], + SourceMode.SCAN_RANGE, useSalt + ); + + split.setEndRowInclusive(true); + splits.add(split); + } + } } -// -// for (int i = 0; i < keys.getFirst().length; i++) { -// -// if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -// LOG.info("NOT including regions : " + regions[i]); -// continue; -// } -// -// // determine if the given start an stop key fall into the region -// if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -// Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -// (stopRow.length == 0 || -// Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { -// -// byte[] splitStart = startRow.length == 0 || -// Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -// keys.getFirst()[i] : startRow; -// byte[] splitStop = (stopRow.length == 0 || -// Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -// keys.getSecond()[i].length > 0 ? -// keys.getSecond()[i] : stopRow; -// HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -// splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); -// splits.add(split); -// -// LOG.info("getSplits: split -> " + i + " -> " + split); -// } -// } - - LOG.info("RETURNED SPLITS: split -> " + splits); + LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); + for( HBaseTableSplit s: splits) { + LOG.info("RETURNED SPLITS: split -> " + s); + } return splits.toArray(new HBaseTableSplit[splits.size()]); } case GET_LIST: { - if( keyList == null || keyList.size() == 0 ) { +// if( keyList == null || keyList.size() == 0 ) { + if( keyList == null ) { throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); } + if( useSalt ) { + TreeSet<String> tempKeyList = new TreeSet<String>(); + + for(String key: keyList) { + tempKeyList.add(HBaseSalter.addSaltPrefix(key)); + } + + keyList = tempKeyList; + } + + LOG.debug("".format("Splitting Key List (%s)", keyList)); + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); for (int i = 0; i < keys.getFirst().length; i++) { @@ -280,44 +294,46 @@ public class HBaseInputFormat continue; } - LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); + LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); Set<String> regionsSubSet = null; if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { - LOG.info("REGION start is empty"); - LOG.info("REGION stop is empty"); + LOG.debug("REGION start is empty"); + LOG.debug("REGION stop is empty"); regionsSubSet = keyList; } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { - LOG.info("REGION start is empty"); + LOG.debug("REGION start is empty"); regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { - LOG.info("REGION stop is empty"); + LOG.debug("REGION stop is empty"); regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); } else { - throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)", + throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); } if( regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); + LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null"); continue; } TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); - LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); + LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, + table.getTableName(), regionKeyList, versions, regions[i], - SourceMode.GET_LIST); + SourceMode.GET_LIST, useSalt); splits.add(split); } + LOG.debug("RETURNED SPLITS: split -> " + splits); + return splits.toArray(new HBaseTableSplit[splits.size()]); } @@ -351,20 +367,23 @@ public class HBaseInputFormat case SCAN_ALL: case SCAN_RANGE: { - LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); + LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); trr.setStartRow(tSplit.getStartRow()); trr.setEndRow(tSplit.getEndRow()); trr.setEndRowInclusive(tSplit.getEndRowInclusive()); + trr.setUseSalt(useSalt); } break; case GET_LIST: { - LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); + LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); trr.setKeyList(tSplit.getKeyList()); + trr.setVersions(tSplit.getVersions()); + trr.setUseSalt(useSalt); } break; @@ -402,6 +421,9 @@ public class HBaseInputFormat private SourceMode sourceMode = SourceMode.EMPTY; private TreeSet<String> keyList = null; + private int versions = 1; + private boolean useSalt = false; + private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; public void configure(JobConf job) { String tableName = getTableName(job); @@ -422,9 +444,12 @@ public class HBaseInputFormat LOG.debug("Entered : " + this.getClass() + " : configure()" ); + useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); + prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); + sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; - LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally", + LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally", String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); switch( sourceMode ) { @@ -442,16 +467,18 @@ public class HBaseInputFormat Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); keyList = new TreeSet<String> (keys); + + versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - LOG.info( "GOT KEY LIST : " + keys ); - LOG.info(String.format("SETTING key list (%s)", keyList) ); + LOG.debug( "GOT KEY LIST : " + keys ); + LOG.debug(String.format("SETTING key list (%s)", keyList) ); break; case EMPTY: LOG.info("HIT EMPTY"); - sourceMode = sourceMode.SCAN_ALL; + sourceMode = SourceMode.SCAN_ALL; break; default: @@ -468,7 +495,7 @@ public class HBaseInputFormat if (tableName == null) { throw new IOException("expecting one table name"); } - LOG.debug("".format("Found Table name [%s]", tableName)); + LOG.debug(String.format("Found Table name [%s]", tableName)); // connected to table? @@ -476,16 +503,16 @@ public class HBaseInputFormat throw new IOException("could not connect to table '" + tableName + "'"); } - LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); + LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); // expecting at least one column String colArg = job.get(COLUMN_LIST); if (colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } - LOG.debug("".format("Found Columns [%s]", colArg)); + LOG.debug(String.format("Found Columns [%s]", colArg)); - LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); + LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey)); if( sourceMode == SourceMode.EMPTY ) { throw new IOException("SourceMode should not be EMPTY"); @@ -504,7 +531,7 @@ public class HBaseInputFormat private void setJobProp( JobConf job, String key, String value) { - if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); + if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); job.set(key, value); } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java new file mode 100644 index 0000000..40f1faf --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -0,0 +1,59 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +public class HBaseOutputFormat extends +FileOutputFormat<ImmutableBytesWritable, Put> { + + /** JobConf parameter that specifies the output table */ + public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); + + + @Override + @SuppressWarnings("unchecked") + public RecordWriter getRecordWriter(FileSystem ignored, + JobConf job, String name, Progressable progress) throws IOException { + + // expecting exactly one path + + String tableName = job.get(OUTPUT_TABLE); + HTable table = null; + try { + table = new HTable(HBaseConfiguration.create(job), tableName); + } catch(IOException e) { + LOG.error(e); + throw e; + } + table.setAutoFlush(false); + return new HBaseRecordWriter(table); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) + throws FileAlreadyExistsException, InvalidJobConfException, IOException { + + String tableName = job.get(OUTPUT_TABLE); + if(tableName == null) { + throw new IOException("Must specify table name"); + } + } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java index 97077c4..d22ed71 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -3,12 +3,17 @@ package parallelai.spyglass.hbase; import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; import java.io.IOException; -import java.util.Set; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.TreeSet; +import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -18,36 +23,38 @@ import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { +public class HBaseRecordReader implements + RecordReader<ImmutableBytesWritable, Result> { static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - private byte [] startRow; - private byte [] endRow; - private byte [] lastSuccessfulRow; + private byte[] startRow; + private byte[] endRow; + private byte[] lastSuccessfulRow; private TreeSet<String> keyList; private SourceMode sourceMode; private Filter trrRowFilter; private ResultScanner scanner; private HTable htable; - private byte [][] trrInputColumns; + private byte[][] trrInputColumns; private long timestamp; private int rowcount; private boolean logScannerActivity = false; private int logPerRowCount = 100; private boolean endRowInclusive = true; + private int versions = 1; + private boolean useSalt = false; /** * Restart from survivable exceptions by creating a new scanner. - * + * * @param firstRow * @throws IOException */ @@ -55,26 +62,26 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R Scan currentScan; if ((endRow != null) && (endRow.length > 0)) { if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, (endRowInclusive ? - Bytes.add(endRow, new byte[] {0}) : endRow ) ); - + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); scan.setCacheBlocks(false); this.scanner = this.htable.getScanner(scan); currentScan = scan; } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toString(firstRow) + ", endRow: " + - Bytes.toString(endRow)); - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); + LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) + + ", endRow: " + Bytes.toString(endRow)); + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); TableInputFormat.addColumns(scan, trrInputColumns); this.scanner = this.htable.getScanner(scan); currentScan = scan; } } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toStringBinary(firstRow) + ", no endRow"); + LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + + ", no endRow"); Scan scan = new Scan(firstRow); TableInputFormat.addColumns(scan, trrInputColumns); @@ -83,7 +90,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R currentScan = scan; } if (logScannerActivity) { - LOG.info("Current scan=" + currentScan.toString()); + LOG.debug("Current scan=" + currentScan.toString()); timestamp = System.currentTimeMillis(); rowcount = 0; } @@ -97,6 +104,14 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R this.keyList = keyList; } + public void setVersions(int versions) { + this.versions = versions; + } + + public void setUseSalt(boolean useSalt) { + this.useSalt = useSalt; + } + public SourceMode getSourceMode() { return sourceMode; } @@ -108,76 +123,84 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R public byte[] getEndRow() { return endRow; } - + public void setEndRowInclusive(boolean isInclusive) { endRowInclusive = isInclusive; } - + public boolean getEndRowInclusive() { return endRowInclusive; } - - private byte [] nextKey = null; + + private byte[] nextKey = null; + private Vector<List<KeyValue>> resultVector = null; + Map<Long, List<KeyValue>> keyValueMap = null; /** * Build the scanner. Not done in constructor to allow for extension. - * + * * @throws IOException */ public void init() throws IOException { - switch( sourceMode ) { - case SCAN_ALL: - case SCAN_RANGE: - restartRangeScan(startRow); - break; - - case GET_LIST: - nextKey = Bytes.toBytes(keyList.pollFirst()); - break; - - default: - throw new IOException(" Unknown source mode : " + sourceMode ); + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + restartRangeScan(startRow); + break; + + case GET_LIST: + nextKey = Bytes.toBytes(keyList.pollFirst()); + break; + + default: + throw new IOException(" Unknown source mode : " + sourceMode); } } byte[] getStartRow() { return this.startRow; } + /** - * @param htable the {@link HTable} to scan. + * @param htable + * the {@link HTable} to scan. */ public void setHTable(HTable htable) { Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean( - ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, + false); logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); this.htable = htable; } /** - * @param inputColumns the columns to be placed in {@link Result}. + * @param inputColumns + * the columns to be placed in {@link Result}. */ - public void setInputColumns(final byte [][] inputColumns) { + public void setInputColumns(final byte[][] inputColumns) { this.trrInputColumns = inputColumns; } /** - * @param startRow the first row in the split + * @param startRow + * the first row in the split */ - public void setStartRow(final byte [] startRow) { + public void setStartRow(final byte[] startRow) { this.startRow = startRow; } /** - * - * @param endRow the last row in the split + * + * @param endRow + * the last row in the split */ - public void setEndRow(final byte [] endRow) { + public void setEndRow(final byte[] endRow) { this.endRow = endRow; } /** - * @param rowFilter the {@link Filter} to be used. + * @param rowFilter + * the {@link Filter} to be used. */ public void setRowFilter(Filter rowFilter) { this.trrRowFilter = rowFilter; @@ -185,12 +208,13 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R @Override public void close() { - if (this.scanner != null) this.scanner.close(); + if (this.scanner != null) + this.scanner.close(); } /** * @return ImmutableBytesWritable - * + * * @see org.apache.hadoop.mapred.RecordReader#createKey() */ @Override @@ -200,7 +224,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R /** * @return RowResult - * + * * @see org.apache.hadoop.mapred.RecordReader#createValue() */ @Override @@ -222,104 +246,260 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R } /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value + * @param key + * HStoreKey as input key. + * @param value + * MapWritable as input value * @return true if there was more data * @throws IOException */ @Override public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - - switch(sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - { - - Result result; + throws IOException { + + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: { + + Result result; + try { try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount ++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // try to handle all IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," + - " if your mapper has restarted a few other times like this" + - " then you should consider killing this job and investigate" + - " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restartRangeScan(startRow); - } else { - restartRangeScan(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + if (logScannerActivity) { + rowcount++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + timestamp = now; + rowcount = 0; } - result = this.scanner.next(); } + } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restartRangeScan(startRow); + } else { + restartRangeScan(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } - if (result != null && result.size() > 0) { + if (result != null && result.size() > 0) { + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { key.set(result.getRow()); - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return false; - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - LOG.info(ioe); - String lastRow = lastSuccessfulRow == null ? - "null" : Bytes.toStringBinary(lastSuccessfulRow); - LOG.info("lastSuccessfulRow=" + lastRow); } - throw ioe; + + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + LOG.debug(ioe); + String lastRow = lastSuccessfulRow == null ? "null" : Bytes + .toStringBinary(lastSuccessfulRow); + LOG.debug("lastSuccessfulRow=" + lastRow); } + throw ioe; } + } + + case GET_LIST: { + LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); - case GET_LIST: - { - Result result; - if( nextKey != null ) { - result = this.htable.get(new Get(nextKey)); + if (versions == 1) { + if (nextKey != null) { + LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); - if (result != null && result.size() > 0) { - System.out.println("KeyList => " + keyList); - System.out.println("Result => " + result); - if (keyList != null || !keyList.isEmpty()) { - - String newKey = keyList.pollFirst(); - System.out.println("New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); - } else { - nextKey = null; - } - key.set(result.getRow()); + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result result = this.htable.get(theGet); + + if (result != null && (! result.isEmpty()) ) { + LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + + if (keyList != null || !keyList.isEmpty()) { + String newKey = keyList.pollFirst(); + LOG.debug("New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + } else { + nextKey = null; + } + + LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); + + // Write the result + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } lastSuccessfulRow = key.get(); Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 + + String newKey; + while((newKey = keyList.pollFirst()) != null) { + LOG.debug("WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); } - return false; } else { + // Nothig left. return false return false; } + } else { + if (resultVector != null && resultVector.size() != 0) { + LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + + List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + if (nextKey != null) { + LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result resultAll = this.htable.get(theGet); + + if( resultAll != null && (! resultAll.isEmpty())) { + List<KeyValue> keyValeList = resultAll.list(); + + keyValueMap = new HashMap<Long, List<KeyValue>>(); + + LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + + for (KeyValue keyValue : keyValeList) { + long version = keyValue.getTimestamp(); + + if (keyValueMap.containsKey(new Long(version))) { + List<KeyValue> keyValueTempList = keyValueMap.get(new Long( + version)); + if (keyValueTempList == null) { + keyValueTempList = new ArrayList<KeyValue>(); + } + keyValueTempList.add(keyValue); + } else { + List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); + keyValueMap.put(new Long(version), keyValueTempList); + keyValueTempList.add(keyValue); + } + } + + resultVector = new Vector<List<KeyValue>>(); + resultVector.addAll(keyValueMap.values()); + + List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + + String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + + System.out.println("+ New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 + + String newKey; + + while( (newKey = keyList.pollFirst()) != null ) { + LOG.debug("+ WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("+ BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + + } else { + return false; + } + } } - - default: - throw new IOException("Unknown source mode : " + sourceMode ); - } + } + default: + throw new IOException("Unknown source mode : " + sourceMode); + } } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java new file mode 100644 index 0000000..1cca133 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -0,0 +1,37 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; + +/** + * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) + * and write to an HBase table + */ +public class HBaseRecordWriter + implements RecordWriter<ImmutableBytesWritable, Put> { + private HTable m_table; + + /** + * Instantiate a TableRecordWriter with the HBase HClient for writing. + * + * @param table + */ + public HBaseRecordWriter(HTable table) { + m_table = table; + } + + public void close(Reporter reporter) + throws IOException { + m_table.close(); + } + + public void write(ImmutableBytesWritable key, + Put value) throws IOException { + m_table.put(new Put(value)); + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java new file mode 100644 index 0000000..fc656a4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -0,0 +1,206 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +public class HBaseSalter { + private static final Log LOG = LogFactory.getLog(HBaseSalter.class); + + + public static byte [] addSaltPrefix(byte [] key) throws IOException { + if( key == null || key.length < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long"); + + String keyStr = Bytes.toString(key); + + return Bytes.toBytes(keyStr.substring(keyStr.length() - 1) + "_" + keyStr); + } + + public static String addSaltPrefix(String key) throws IOException { + if( key == null || key.length() < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long"); + + return (key.substring(key.length() - 1) + "_" + key); + } + + public static ImmutableBytesWritable addSaltPrefix( ImmutableBytesWritable key ) throws IOException { + return new ImmutableBytesWritable( addSaltPrefix(key.get())); + } + + public static byte [] delSaltPrefix(byte [] key) throws IOException { + if( key == null || key.length < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long"); + + String keyStr = Bytes.toString(key); + + return Bytes.toBytes(keyStr.substring(2)); + } + + public static String delSaltPrefix(String key) throws IOException { + if( key == null || key.length() < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long"); + + return key.substring(2); + } + + public static ImmutableBytesWritable delSaltPrefix( ImmutableBytesWritable key ) throws IOException { + return new ImmutableBytesWritable( delSaltPrefix(key.get())); + } + + public static final String DEFAULT_PREFIX_LIST = " !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~"; + + public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey) throws IOException { + return getDistributedIntervals(originalStartKey, originalStopKey, DEFAULT_PREFIX_LIST); + } + + public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey, String prefixList) throws IOException { + return getDistributedIntervals(originalStartKey, originalStopKey, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, prefixList); + } + + + public static Pair<byte[], byte[]>[] getDistributedIntervals( + byte[] originalStartKey, byte[] originalStopKey, + byte[] regionStartKey, byte[] regionStopKey, + String prefixList) throws IOException { + LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", + Bytes.toString(originalStartKey), + Bytes.toString(originalStopKey), + Bytes.toString(regionStartKey), + Bytes.toString(regionStopKey), + prefixList + )); + + byte[][] startKeys; + byte[][] stopKeys; + + if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW) + && Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW) ) { + startKeys = getAllKeys(originalStartKey, prefixList); + stopKeys = getAllKeys(originalStopKey, prefixList); + } else if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW)) { + startKeys = getAllKeysWithStop(originalStartKey, prefixList, regionStopKey[0]); + stopKeys = getAllKeysWithStop(originalStopKey, prefixList, regionStopKey[0]); + } else if(Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW)) { + startKeys = getAllKeysWithStart(originalStartKey, prefixList, regionStartKey[0]); + stopKeys = getAllKeysWithStart(originalStopKey, prefixList, regionStartKey[0]); + } else { + startKeys = getAllKeysInRange(originalStartKey, prefixList, regionStartKey[0], regionStopKey[0]); + stopKeys = getAllKeysInRange(originalStopKey, prefixList, regionStartKey[0], regionStopKey[0]); + } + + if( startKeys.length != stopKeys.length) { + throw new IOException("LENGTH of START Keys and STOP Keys DO NOT match"); + } + + if( Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW) + && Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW) ) { + Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR); + // stop keys are the start key of the next interval + for (int i = startKeys.length - 1; i >= 1; i--) { + startKeys[i] = startKeys[i - 1]; + } + startKeys[0] = HConstants.EMPTY_START_ROW; + stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW; + } else if (Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)) { + Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR); + // stop keys are the start key of the next interval + for (int i = startKeys.length - 1; i >= 1; i--) { + startKeys[i] = startKeys[i - 1]; + } + startKeys[0] = HConstants.EMPTY_START_ROW; + } else if (Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW)) { + Arrays.sort(startKeys, Bytes.BYTES_RAWCOMPARATOR); + // stop keys are the start key of the next interval + for (int i = 0; i < stopKeys.length - 1; i++) { + stopKeys[i] = stopKeys[i + 1]; + } + stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW; + } + + Pair<byte[], byte[]>[] intervals = new Pair[startKeys.length]; + for (int i = 0; i < startKeys.length; i++) { + intervals[i] = new Pair<byte[], byte[]>(startKeys[i], stopKeys[i]); + } + + return intervals; + } + + public static byte[][] getAllKeys(byte[] originalKey) throws IOException { + return getAllKeys(originalKey, DEFAULT_PREFIX_LIST); + } + + public static byte[][] getAllKeys(byte[] originalKey, String prefixList) throws IOException { + char[] prefixArray = prefixList.toCharArray(); + + return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)prefixArray[prefixArray.length - 1]); + } + + public static byte[][] getAllKeysWithStart(byte[] originalKey, String prefixList, byte startKey) throws IOException { + char[] prefixArray = prefixList.toCharArray(); + + return getAllKeysWithStartStop(originalKey, prefixList, startKey, (byte)prefixArray[prefixArray.length - 1]); + } + + public static byte[][] getAllKeysWithStop(byte[] originalKey, String prefixList, byte stopKey) throws IOException { + char[] prefixArray = prefixList.toCharArray(); + + return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); + } + + public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { + return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); + } + + private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { + char[] prefixArray = prefixList.toCharArray(); + TreeSet<Byte> prefixSet = new TreeSet<Byte>(); + + for( char c : prefixArray ) { + prefixSet.add((byte)c); + } + + SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); + + return getAllKeys(originalKey, subSet.toArray(new Byte[]{})); + } + + public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { + byte[][] keys = new byte[prefixArray.length][]; + + for (byte i = 0; i < prefixArray.length; i++) { + keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey)); + } + + return keys; + } + + public static void main(String [] args) throws IOException { + + String s = ""; + + for (byte i = 32; i < Byte.MAX_VALUE; i++) { + s += (char)i; + } + + System.out.println("".format("[%s]", s)); + System.out.println("".format("[%s]", DEFAULT_PREFIX_LIST)); + + + String start = Bytes.toString(HConstants.EMPTY_START_ROW); + String stop = Bytes.toString(HConstants.EMPTY_END_ROW); + + System.out.println("".format("START: (%s) STOP: (%s)", start, stop)); + + Pair<byte[], byte[]>[] intervals = getDistributedIntervals(Bytes.toBytes(start), Bytes.toBytes(stop), "1423"); + + for( Pair<byte[], byte[]> p : intervals ) { + System.out.println("".format("iSTART: (%s) iSTOP: (%s)", Bytes.toString(p.getFirst()), Bytes.toString(p.getSecond()))); + } + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index e5acc30..a7d36fd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -24,7 +24,6 @@ import cascading.util.Util; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -63,7 +62,9 @@ public class HBaseScheme /** String columns */ private transient String[] columns; /** Field fields */ - private transient byte[][] fields; +// private transient byte[][] fields; + + private boolean useSalt = false; /** @@ -237,8 +238,13 @@ public class HBaseScheme OutputCollector outputCollector = sinkCall.getOutput(); Tuple key = tupleEntry.selectTuple(keyField); ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); + + if( useSalt ) { + keyBytes = HBaseSalter.addSaltPrefix(keyBytes); + } + Put put = new Put(keyBytes.get(), this.timeStamp); - + for (int i = 0; i < valueFields.length; i++) { Fields fieldSelector = valueFields[i]; TupleEntry values = tupleEntry.selectEntry(fieldSelector); @@ -258,10 +264,13 @@ public class HBaseScheme @Override public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { - conf.setOutputFormat(TableOutputFormat.class); + conf.setOutputFormat(HBaseOutputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); conf.setOutputValueClass(Put.class); + + String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE); + useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false); } @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java index 1d48e1d..a5c3bdd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -1,11 +1,9 @@ package parallelai.spyglass.hbase; -import java.awt.event.KeyListener; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; -import java.util.Set; import java.util.TreeSet; import org.apache.commons.logging.Log; @@ -14,8 +12,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.InputSplit; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import com.sun.tools.javac.resources.version; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { @@ -28,11 +27,13 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, private TreeSet<String> m_keyList = null; private SourceMode m_sourceMode = SourceMode.EMPTY; private boolean m_endRowInclusive = true; + private int m_versions = 1; + private boolean m_useSalt = false; /** default constructor */ public HBaseTableSplit() { this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); + HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); } /** @@ -43,19 +44,22 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, * @param location */ public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, - final String location, final SourceMode sourceMode) { + final String location, final SourceMode sourceMode, final boolean useSalt) { this.m_tableName = tableName; this.m_startRow = startRow; this.m_endRow = endRow; this.m_regionLocation = location; this.m_sourceMode = sourceMode; + this.m_useSalt = useSalt; } - public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { + public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) { this.m_tableName = tableName; this.m_keyList = keyList; + this.m_versions = versions; this.m_sourceMode = sourceMode; this.m_regionLocation = location; + this.m_useSalt = useSalt; } /** @return table name */ @@ -86,20 +90,28 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, return m_keyList; } + public int getVersions() { + return m_versions; + } + /** @return get the source mode */ public SourceMode getSourceMode() { return m_sourceMode; } + + public boolean getUseSalt() { + return m_useSalt; + } /** @return the region's hostname */ public String getRegionLocation() { - LOG.info("REGION GETTER : " + m_regionLocation); + LOG.debug("REGION GETTER : " + m_regionLocation); return this.m_regionLocation; } public String[] getLocations() { - LOG.info("REGION ARRAY : " + m_regionLocation); + LOG.debug("REGION ARRAY : " + m_regionLocation); return new String[] {this.m_regionLocation}; } @@ -112,11 +124,12 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, @Override public void readFields(DataInput in) throws IOException { - LOG.info("READ ME : " + in.toString()); + LOG.debug("READ ME : " + in.toString()); this.m_tableName = Bytes.readByteArray(in); this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); + this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); switch(this.m_sourceMode) { case SCAN_RANGE: @@ -126,6 +139,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, break; case GET_LIST: + this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); this.m_keyList = new TreeSet<String>(); int m = Bytes.toInt(Bytes.readByteArray(in)); @@ -136,16 +150,17 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, break; } - LOG.info("READ and CREATED : " + this); + LOG.debug("READ and CREATED : " + this); } @Override public void write(DataOutput out) throws IOException { - LOG.info("WRITE : " + this); + LOG.debug("WRITE : " + this); Bytes.writeByteArray(out, this.m_tableName); Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); switch( this.m_sourceMode ) { case SCAN_RANGE: @@ -155,6 +170,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, break; case GET_LIST: + Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); for( String k: this.m_keyList ) { @@ -163,13 +179,14 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, break; } - LOG.info("WROTE : " + out.toString()); + LOG.debug("WROTE : " + out.toString()); } @Override public String toString() { - return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)", - Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); + return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), + (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt); } @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 9a0ed0e..65873a0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,7 @@ package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.flow.FlowProcess; import cascading.tap.SinkMode; import cascading.tap.Tap; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -35,13 +35,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Map.Entry; +import java.util.Arrays; import java.util.UUID; /** @@ -172,7 +169,12 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { } - conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); + conf.set(HBaseOutputFormat.OUTPUT_TABLE, tableName); + + for( SinkConfig sc : sinkConfigList) { + sc.configure(conf); + } + super.sinkConfInit(process, conf); } @@ -292,10 +294,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { public String startKey = null; public String stopKey = null; public String [] keyList = null; + public int versions = 1; + public boolean useSalt = false; + public String prefixList = null; public void configure(Configuration jobConf) { switch( sourceMode ) { case SCAN_RANGE: + if (stopKey != null && startKey != null && startKey.compareTo(stopKey) > 0) { + String t = stopKey; + stopKey = startKey; + startKey = t; + } + jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); if( startKey != null && startKey.length() > 0 ) @@ -304,57 +315,104 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { if( stopKey != null && stopKey.length() > 0 ) jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); - LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); - LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); - LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); + // Added for Salting + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); + + LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + LOG.info(String.format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); + LOG.info(String.format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); break; case GET_LIST: jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); + jobConf.setInt(String.format(HBaseConstants.VERSIONS, tableName), versions); - LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); - LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); + // Added for Salting + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); + + LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + LOG.info(String.format("Setting KEY LIST (%s) to key list length (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList.length)); break; default: jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); - LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + // Added for Salting + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); + + LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); break; } } } + private static class SinkConfig implements Serializable { + public String tableName = null; + public boolean useSalt = false; + + public void configure(Configuration jobConf) { + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + } + } + private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>(); - - public void setHBaseRangeParms(String startKey, String stopKey ) { + private ArrayList<SinkConfig> sinkConfigList = new ArrayList<SinkConfig>(); + + public void setHBaseRangeParms(String startKey, String stopKey, boolean useSalt, String prefixList ) { SourceConfig sc = new SourceConfig(); sc.sourceMode = SourceMode.SCAN_RANGE; sc.tableName = tableName; sc.startKey = startKey; sc.stopKey = stopKey; + sc.useSalt = useSalt; + setPrefixList(sc, prefixList); sourceConfigList.add(sc); } - public void setHBaseListParms(String [] keyList ) { + public void setHBaseListParms(String [] keyList, int versions, boolean useSalt, String prefixList ) { SourceConfig sc = new SourceConfig(); sc.sourceMode = SourceMode.GET_LIST; sc.tableName = tableName; sc.keyList = keyList; + sc.versions = (versions < 1) ? 1 : versions; + sc.useSalt = useSalt; + setPrefixList(sc, prefixList); sourceConfigList.add(sc); } - public void setHBaseScanAllParms() { + public void setHBaseScanAllParms(boolean useSalt, String prefixList) { SourceConfig sc = new SourceConfig(); sc.sourceMode = SourceMode.SCAN_ALL; sc.tableName = tableName; + sc.useSalt = useSalt; + + setPrefixList(sc, prefixList); sourceConfigList.add(sc); } + + public void setUseSaltInSink( boolean useSalt ) { + SinkConfig sc = new SinkConfig(); + + sc.tableName = tableName; + sc.useSalt = useSalt; + + sinkConfigList.add(sc); + } + + private void setPrefixList(SourceConfig sc, String prefixList ) { + prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList; + char[] prefixArray = prefixList.toCharArray(); + Arrays.sort(prefixArray); + sc.prefixList = new String( prefixArray ); + } } |