diff options
| author | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 | 
|---|---|---|
| committer | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 | 
| commit | 6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch) | |
| tree | 5254682e3c3440f7c6954b23519459107b8a445e /src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java | |
| parent | ea9c80374da846edf2a1634a42ccb932838ebd5b (diff) | |
| download | SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip | |
Added extensions to Read and Write mode.
Added support for key prefixes
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java')
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java | 245 | 
1 files changed, 136 insertions, 109 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index f1f4fb7..aabdc5e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -5,8 +5,8 @@ import java.net.InetAddress;  import java.util.ArrayList;  import java.util.Collection;  import java.util.HashMap; -import java.util.HashSet;  import java.util.List; +import java.util.NavigableMap;  import java.util.Set;  import java.util.TreeSet;  import java.util.UUID; @@ -17,8 +17,10 @@ import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo;  import org.apache.hadoop.hbase.HRegionLocation;  import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.filter.Filter; @@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -  public class HBaseInputFormat    implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -48,9 +49,9 @@ public class HBaseInputFormat    private byte [][] inputColumns;    private HTable table; -  private HBaseRecordReader tableRecordReader; +//  private HBaseRecordReader tableRecordReader;    private Filter rowFilter; -  private String tableName = ""; +//  private String tableName = "";    private HashMap<InetAddress, String> reverseDNSCacheMap =      new HashMap<InetAddress, String>(); @@ -83,7 +84,8 @@ public class HBaseInputFormat        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);        HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc -              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); +              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); +              splits.add(split);        return splits.toArray(new HBaseTableSplit[splits.size()]); @@ -100,7 +102,7 @@ public class HBaseInputFormat      byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];      byte[] maxKey = keys.getSecond()[0]; -    LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +    LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));      byte [][] regStartKeys = keys.getFirst();      byte [][] regStopKeys = keys.getSecond(); @@ -144,29 +146,29 @@ public class HBaseInputFormat        regions[i] = regionLocation; -      LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); +      LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));      }      byte[] startRow = HConstants.EMPTY_START_ROW;      byte[] stopRow = HConstants.EMPTY_END_ROW; -    LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +    LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); -    LOG.info("SOURCE MODE is : " + sourceMode);     +    LOG.debug("SOURCE MODE is : " + sourceMode);          switch( sourceMode ) {      case SCAN_ALL:        startRow = HConstants.EMPTY_START_ROW;        stopRow = HConstants.EMPTY_END_ROW; -      LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));        break;      case SCAN_RANGE:        startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;        stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; -      LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));        break;      } @@ -180,98 +182,110 @@ public class HBaseInputFormat          List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); -        List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -         -        int maxRegions = validRegions.size(); -        int currentRegion = 1; -         -        for( HRegionLocation cRegion : validRegions ) { -          byte [] rStart = cRegion.getRegionInfo().getStartKey(); -          byte [] rStop = cRegion.getRegionInfo().getEndKey(); +        if( ! useSalt ) { -          HServerAddress regionServerAddress = cRegion.getServerAddress(); -            InetAddress regionAddress = -              regionServerAddress.getInetSocketAddress().getAddress(); -            String regionLocation; -            try { -              regionLocation = reverseDNS(regionAddress); -            } catch (NamingException e) { -              LOG.error("Cannot resolve the host name for " + regionAddress + -                  " because of " + e); -              regionLocation = regionServerAddress.getHostname(); -            } -             -            byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); -            byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  -             -            LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",  -                (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), -                    (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), -                    rStop.length -                    )); +          List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -          HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), -              sStart, -              sStop, -              regionLocation, -              SourceMode.SCAN_RANGE -           ); +          int maxRegions = validRegions.size(); +          int currentRegion = 1; -          split.setEndRowInclusive( currentRegion == maxRegions ); - -          currentRegion ++; -                   -           LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  -               Bytes.toString(startRow), Bytes.toString(stopRow), -               Bytes.toString(rStart), Bytes.toString(rStop), -               Bytes.toString(sStart),  -               Bytes.toString(sStop),  -               cRegion.getHostnamePort(), split) ); +          for( HRegionLocation cRegion : validRegions ) { +            byte [] rStart = cRegion.getRegionInfo().getStartKey(); +            byte [] rStop = cRegion.getRegionInfo().getEndKey(); +             +            HServerAddress regionServerAddress = cRegion.getServerAddress(); +              InetAddress regionAddress = +                regionServerAddress.getInetSocketAddress().getAddress(); +              String regionLocation; +              try { +                regionLocation = reverseDNS(regionAddress); +              } catch (NamingException e) { +                LOG.error("Cannot resolve the host name for " + regionAddress + +                    " because of " + e); +                regionLocation = regionServerAddress.getHostname(); +              } +               +              byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); +              byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  +               +              LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",  +                  (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), +                      (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), +                      rStop.length +                      )); +             +            HBaseTableSplit split = new HBaseTableSplit( +                table.getTableName(), +                sStart, +                sStop, +                regionLocation, +                SourceMode.SCAN_RANGE, useSalt +             ); +             +            split.setEndRowInclusive( currentRegion == maxRegions ); +   +            currentRegion ++; +                     +             LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  +                 Bytes.toString(startRow), Bytes.toString(stopRow), +                 Bytes.toString(rStart), Bytes.toString(rStop), +                 Bytes.toString(sStart),  +                 Bytes.toString(sStop),  +                 cRegion.getHostnamePort(), split) ); +             +             splits.add(split); +          } +        } else { +          LOG.debug("Using SALT : " + useSalt ); -           splits.add(split); +          // Will return the start and the stop key with all possible prefixes. +          for( int i = 0; i < regions.length; i++ ) { +        	Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); +        	 +            for( Pair<byte[], byte[]> pair : intervals ) { +              LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); +               +              HBaseTableSplit split = new HBaseTableSplit( +                  table.getTableName(), +                  pair.getFirst(), +                  pair.getSecond(), +                  regions[i], +                  SourceMode.SCAN_RANGE, useSalt +               ); + +              split.setEndRowInclusive(true); +              splits.add(split); +            } +          }          } -// -//        for (int i = 0; i < keys.getFirst().length; i++) { -// -//          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -//            LOG.info("NOT including regions : " + regions[i]); -//            continue; -//          } -//           -//          // determine if the given start an stop key fall into the region -//          if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -//               Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -//              (stopRow.length == 0 || -//               Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { -//             -//            byte[] splitStart = startRow.length == 0 || -//              Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -//                keys.getFirst()[i] : startRow; -//            byte[] splitStop = (stopRow.length == 0 || -//              Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -//              keys.getSecond()[i].length > 0 ? -//                keys.getSecond()[i] : stopRow; -//                HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -//              splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); -//            splits.add(split); -//             -//            LOG.info("getSplits: split -> " + i + " -> " + split); -//          } -//        } -         -        LOG.info("RETURNED SPLITS: split -> " + splits); +        LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); +        for( HBaseTableSplit s: splits) { +            LOG.info("RETURNED SPLITS: split -> " + s); +        }          return splits.toArray(new HBaseTableSplit[splits.size()]);        }         case GET_LIST:        { -        if( keyList == null || keyList.size() == 0 ) { +//        if( keyList == null || keyList.size() == 0 ) { +        if( keyList == null ) {            throw new IOException("Source Mode is GET_LIST but key list is EMPTY");          }  +        if( useSalt ) { +          TreeSet<String> tempKeyList = new TreeSet<String>(); +           +          for(String key: keyList) { +            tempKeyList.add(HBaseSalter.addSaltPrefix(key)); +          } +           +          keyList = tempKeyList; +        } +         +        LOG.debug("".format("Splitting Key List (%s)", keyList)); +                  List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();          for (int i = 0; i < keys.getFirst().length; i++) { @@ -280,44 +294,46 @@ public class HBaseInputFormat              continue;            } -          LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); +          LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));            Set<String> regionsSubSet = null;            if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { -            LOG.info("REGION start is empty"); -            LOG.info("REGION stop is empty"); +            LOG.debug("REGION start is empty"); +            LOG.debug("REGION stop is empty");              regionsSubSet = keyList;            } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { -            LOG.info("REGION start is empty"); +            LOG.debug("REGION start is empty");              regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);            } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { -            LOG.info("REGION stop is empty"); +            LOG.debug("REGION stop is empty");              regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);            } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {              regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);            } else { -            throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",  +            throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",                   regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));            }            if( regionsSubSet == null || regionsSubSet.size() == 0) { -            LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); +            LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null");              continue;            }            TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); -          LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); +          LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));            HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), regionKeyList, +              table.getTableName(), regionKeyList, versions,                regions[i],  -              SourceMode.GET_LIST); +              SourceMode.GET_LIST, useSalt);            splits.add(split);          } +        LOG.debug("RETURNED SPLITS: split -> " + splits); +          return splits.toArray(new HBaseTableSplit[splits.size()]);        }  @@ -351,20 +367,23 @@ public class HBaseInputFormat        case SCAN_ALL:        case SCAN_RANGE:        { -        LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); +        LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));          trr.setStartRow(tSplit.getStartRow());          trr.setEndRow(tSplit.getEndRow());          trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +        trr.setUseSalt(useSalt);        }        break;        case GET_LIST:        { -        LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); +        LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));          trr.setKeyList(tSplit.getKeyList()); +        trr.setVersions(tSplit.getVersions()); +        trr.setUseSalt(useSalt);        }        break; @@ -402,6 +421,9 @@ public class HBaseInputFormat    private SourceMode sourceMode = SourceMode.EMPTY;    private TreeSet<String> keyList = null; +  private int versions = 1; +  private boolean useSalt = false; +  private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;    public void configure(JobConf job) {      String tableName = getTableName(job); @@ -422,9 +444,12 @@ public class HBaseInputFormat      LOG.debug("Entered : " + this.getClass() + " : configure()" ); +    useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); +    prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); +          sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; -    LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",  +    LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",           String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));      switch( sourceMode ) { @@ -442,16 +467,18 @@ public class HBaseInputFormat          Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));          keyList = new TreeSet<String> (keys); + +        versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); -        LOG.info( "GOT KEY LIST : " + keys ); -        LOG.info(String.format("SETTING key list (%s)", keyList) ); +        LOG.debug( "GOT KEY LIST : " + keys ); +        LOG.debug(String.format("SETTING key list (%s)", keyList) );          break;        case EMPTY:          LOG.info("HIT EMPTY"); -        sourceMode = sourceMode.SCAN_ALL; +        sourceMode = SourceMode.SCAN_ALL;          break;        default: @@ -468,7 +495,7 @@ public class HBaseInputFormat      if (tableName == null) {        throw new IOException("expecting one table name");      } -    LOG.debug("".format("Found Table name [%s]", tableName)); +    LOG.debug(String.format("Found Table name [%s]", tableName));      // connected to table? @@ -476,16 +503,16 @@ public class HBaseInputFormat        throw new IOException("could not connect to table '" +          tableName + "'");      } -    LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); +    LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));      // expecting at least one column      String colArg = job.get(COLUMN_LIST);      if (colArg == null || colArg.length() == 0) {        throw new IOException("expecting at least one column");      } -    LOG.debug("".format("Found Columns [%s]", colArg)); +    LOG.debug(String.format("Found Columns [%s]", colArg)); -    LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); +    LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey));      if( sourceMode == SourceMode.EMPTY ) {        throw new IOException("SourceMode should not be EMPTY"); @@ -504,7 +531,7 @@ public class HBaseInputFormat    private void setJobProp( JobConf job, String key, String value) { -    if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); +    if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));      job.set(key,  value);    } | 
