From 6e21e0c68248a33875898b86a2be7a9cec7df3d4 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Thu, 6 Jun 2013 12:27:15 +0100 Subject: Added extensions to Read and Write mode. Added support for key prefixes --- .../spyglass/hbase/HBaseInputFormat.java | 245 ++++++++++++--------- 1 file changed, 136 insertions(+), 109 deletions(-) (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index f1f4fb7..aabdc5e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -5,8 +5,8 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; +import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -17,8 +17,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.filter.Filter; @@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - public class HBaseInputFormat implements InputFormat, JobConfigurable { @@ -48,9 +49,9 @@ public class HBaseInputFormat private byte [][] inputColumns; private HTable table; - private HBaseRecordReader tableRecordReader; +// private HBaseRecordReader tableRecordReader; private Filter rowFilter; - private String tableName = ""; +// private String tableName = ""; private HashMap reverseDNSCacheMap = new HashMap(); @@ -83,7 +84,8 @@ public class HBaseInputFormat List splits = new ArrayList(1); HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); + splits.add(split); return splits.toArray(new HBaseTableSplit[splits.size()]); @@ -100,7 +102,7 @@ public class HBaseInputFormat byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; byte[] maxKey = keys.getSecond()[0]; - LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); byte [][] regStartKeys = keys.getFirst(); byte [][] regStopKeys = keys.getSecond(); @@ -144,29 +146,29 @@ public class HBaseInputFormat regions[i] = regionLocation; - LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); + LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); } byte[] startRow = HConstants.EMPTY_START_ROW; byte[] stopRow = HConstants.EMPTY_END_ROW; - LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); - LOG.info("SOURCE MODE is : " + sourceMode); + LOG.debug("SOURCE MODE is : " + sourceMode); switch( sourceMode ) { case SCAN_ALL: startRow = HConstants.EMPTY_START_ROW; stopRow = HConstants.EMPTY_END_ROW; - LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); + LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; case SCAN_RANGE: startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; - LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); + LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; } @@ -180,98 +182,110 @@ public class HBaseInputFormat List splits = new ArrayList(); - List validRegions = table.getRegionsInRange(startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for( HRegionLocation cRegion : validRegions ) { - byte [] rStart = cRegion.getRegionInfo().getStartKey(); - byte [] rStop = cRegion.getRegionInfo().getEndKey(); + if( ! useSalt ) { - HServerAddress regionServerAddress = cRegion.getServerAddress(); - InetAddress regionAddress = - regionServerAddress.getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress + - " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); - byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); - - LOG.info("".format("BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), - rStop.length - )); + List validRegions = table.getRegionsInRange(startRow, stopRow); - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), - sStart, - sStop, - regionLocation, - SourceMode.SCAN_RANGE - ); + int maxRegions = validRegions.size(); + int currentRegion = 1; - split.setEndRowInclusive( currentRegion == maxRegions ); - - currentRegion ++; - - LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), Bytes.toString(stopRow), - Bytes.toString(rStart), Bytes.toString(rStop), - Bytes.toString(sStart), - Bytes.toString(sStop), - cRegion.getHostnamePort(), split) ); + for( HRegionLocation cRegion : validRegions ) { + byte [] rStart = cRegion.getRegionInfo().getStartKey(); + byte [] rStop = cRegion.getRegionInfo().getEndKey(); + + HServerAddress regionServerAddress = cRegion.getServerAddress(); + InetAddress regionAddress = + regionServerAddress.getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); + byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); + + LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)", + (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), + (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), + rStop.length + )); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), + sStart, + sStop, + regionLocation, + SourceMode.SCAN_RANGE, useSalt + ); + + split.setEndRowInclusive( currentRegion == maxRegions ); + + currentRegion ++; + + LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", + Bytes.toString(startRow), Bytes.toString(stopRow), + Bytes.toString(rStart), Bytes.toString(rStop), + Bytes.toString(sStart), + Bytes.toString(sStop), + cRegion.getHostnamePort(), split) ); + + splits.add(split); + } + } else { + LOG.debug("Using SALT : " + useSalt ); - splits.add(split); + // Will return the start and the stop key with all possible prefixes. + for( int i = 0; i < regions.length; i++ ) { + Pair[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); + + for( Pair pair : intervals ) { + LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), + pair.getFirst(), + pair.getSecond(), + regions[i], + SourceMode.SCAN_RANGE, useSalt + ); + + split.setEndRowInclusive(true); + splits.add(split); + } + } } -// -// for (int i = 0; i < keys.getFirst().length; i++) { -// -// if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -// LOG.info("NOT including regions : " + regions[i]); -// continue; -// } -// -// // determine if the given start an stop key fall into the region -// if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -// Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -// (stopRow.length == 0 || -// Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { -// -// byte[] splitStart = startRow.length == 0 || -// Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -// keys.getFirst()[i] : startRow; -// byte[] splitStop = (stopRow.length == 0 || -// Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -// keys.getSecond()[i].length > 0 ? -// keys.getSecond()[i] : stopRow; -// HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -// splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); -// splits.add(split); -// -// LOG.info("getSplits: split -> " + i + " -> " + split); -// } -// } - - LOG.info("RETURNED SPLITS: split -> " + splits); + LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); + for( HBaseTableSplit s: splits) { + LOG.info("RETURNED SPLITS: split -> " + s); + } return splits.toArray(new HBaseTableSplit[splits.size()]); } case GET_LIST: { - if( keyList == null || keyList.size() == 0 ) { +// if( keyList == null || keyList.size() == 0 ) { + if( keyList == null ) { throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); } + if( useSalt ) { + TreeSet tempKeyList = new TreeSet(); + + for(String key: keyList) { + tempKeyList.add(HBaseSalter.addSaltPrefix(key)); + } + + keyList = tempKeyList; + } + + LOG.debug("".format("Splitting Key List (%s)", keyList)); + List splits = new ArrayList(); for (int i = 0; i < keys.getFirst().length; i++) { @@ -280,44 +294,46 @@ public class HBaseInputFormat continue; } - LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); + LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); Set regionsSubSet = null; if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { - LOG.info("REGION start is empty"); - LOG.info("REGION stop is empty"); + LOG.debug("REGION start is empty"); + LOG.debug("REGION stop is empty"); regionsSubSet = keyList; } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { - LOG.info("REGION start is empty"); + LOG.debug("REGION start is empty"); regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { - LOG.info("REGION stop is empty"); + LOG.debug("REGION stop is empty"); regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); } else { - throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)", + throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); } if( regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); + LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null"); continue; } TreeSet regionKeyList = new TreeSet(regionsSubSet); - LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); + LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, + table.getTableName(), regionKeyList, versions, regions[i], - SourceMode.GET_LIST); + SourceMode.GET_LIST, useSalt); splits.add(split); } + LOG.debug("RETURNED SPLITS: split -> " + splits); + return splits.toArray(new HBaseTableSplit[splits.size()]); } @@ -351,20 +367,23 @@ public class HBaseInputFormat case SCAN_ALL: case SCAN_RANGE: { - LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); + LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); trr.setStartRow(tSplit.getStartRow()); trr.setEndRow(tSplit.getEndRow()); trr.setEndRowInclusive(tSplit.getEndRowInclusive()); + trr.setUseSalt(useSalt); } break; case GET_LIST: { - LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); + LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); trr.setKeyList(tSplit.getKeyList()); + trr.setVersions(tSplit.getVersions()); + trr.setUseSalt(useSalt); } break; @@ -402,6 +421,9 @@ public class HBaseInputFormat private SourceMode sourceMode = SourceMode.EMPTY; private TreeSet keyList = null; + private int versions = 1; + private boolean useSalt = false; + private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; public void configure(JobConf job) { String tableName = getTableName(job); @@ -422,9 +444,12 @@ public class HBaseInputFormat LOG.debug("Entered : " + this.getClass() + " : configure()" ); + useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); + prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); + sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; - LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally", + LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally", String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); switch( sourceMode ) { @@ -442,16 +467,18 @@ public class HBaseInputFormat Collection keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); keyList = new TreeSet (keys); + + versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - LOG.info( "GOT KEY LIST : " + keys ); - LOG.info(String.format("SETTING key list (%s)", keyList) ); + LOG.debug( "GOT KEY LIST : " + keys ); + LOG.debug(String.format("SETTING key list (%s)", keyList) ); break; case EMPTY: LOG.info("HIT EMPTY"); - sourceMode = sourceMode.SCAN_ALL; + sourceMode = SourceMode.SCAN_ALL; break; default: @@ -468,7 +495,7 @@ public class HBaseInputFormat if (tableName == null) { throw new IOException("expecting one table name"); } - LOG.debug("".format("Found Table name [%s]", tableName)); + LOG.debug(String.format("Found Table name [%s]", tableName)); // connected to table? @@ -476,16 +503,16 @@ public class HBaseInputFormat throw new IOException("could not connect to table '" + tableName + "'"); } - LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); + LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); // expecting at least one column String colArg = job.get(COLUMN_LIST); if (colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } - LOG.debug("".format("Found Columns [%s]", colArg)); + LOG.debug(String.format("Found Columns [%s]", colArg)); - LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); + LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey)); if( sourceMode == SourceMode.EMPTY ) { throw new IOException("SourceMode should not be EMPTY"); @@ -504,7 +531,7 @@ public class HBaseInputFormat private void setJobProp( JobConf job, String key, String value) { - if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); + if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); job.set(key, value); } -- cgit v1.2.3