diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass')
9 files changed, 865 insertions, 269 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index b546107..5baf20e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -15,5 +15,8 @@ public class HBaseConstants {    public static final String STOP_KEY = "hbase.%s.stopkey";     public static final String SOURCE_MODE = "hbase.%s.source.mode";    public static final String KEY_LIST = "hbase.%s.key.list"; +  public static final String VERSIONS = "hbase.%s.versions"; +  public static final String USE_SALT = "hbase.%s.use.salt"; +  public static final String SALT_PREFIX = "hbase.%s.salt.prefix";  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index f1f4fb7..aabdc5e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -5,8 +5,8 @@ import java.net.InetAddress;  import java.util.ArrayList;  import java.util.Collection;  import java.util.HashMap; -import java.util.HashSet;  import java.util.List; +import java.util.NavigableMap;  import java.util.Set;  import java.util.TreeSet;  import java.util.UUID; @@ -17,8 +17,10 @@ import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo;  import org.apache.hadoop.hbase.HRegionLocation;  import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.filter.Filter; @@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -  public class HBaseInputFormat    implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -48,9 +49,9 @@ public class HBaseInputFormat    private byte [][] inputColumns;    private HTable table; -  private HBaseRecordReader tableRecordReader; +//  private HBaseRecordReader tableRecordReader;    private Filter rowFilter; -  private String tableName = ""; +//  private String tableName = "";    private HashMap<InetAddress, String> reverseDNSCacheMap =      new HashMap<InetAddress, String>(); @@ -83,7 +84,8 @@ public class HBaseInputFormat        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);        HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc -              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); +              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); +              splits.add(split);        return splits.toArray(new HBaseTableSplit[splits.size()]); @@ -100,7 +102,7 @@ public class HBaseInputFormat      byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];      byte[] maxKey = keys.getSecond()[0]; -    LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +    LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));      byte [][] regStartKeys = keys.getFirst();      byte [][] regStopKeys = keys.getSecond(); @@ -144,29 +146,29 @@ public class HBaseInputFormat        regions[i] = regionLocation; -      LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); +      LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));      }      byte[] startRow = HConstants.EMPTY_START_ROW;      byte[] stopRow = HConstants.EMPTY_END_ROW; -    LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +    LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); -    LOG.info("SOURCE MODE is : " + sourceMode);     +    LOG.debug("SOURCE MODE is : " + sourceMode);          switch( sourceMode ) {      case SCAN_ALL:        startRow = HConstants.EMPTY_START_ROW;        stopRow = HConstants.EMPTY_END_ROW; -      LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));        break;      case SCAN_RANGE:        startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;        stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; -      LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));        break;      } @@ -180,98 +182,110 @@ public class HBaseInputFormat          List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); -        List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -         -        int maxRegions = validRegions.size(); -        int currentRegion = 1; -         -        for( HRegionLocation cRegion : validRegions ) { -          byte [] rStart = cRegion.getRegionInfo().getStartKey(); -          byte [] rStop = cRegion.getRegionInfo().getEndKey(); +        if( ! useSalt ) { -          HServerAddress regionServerAddress = cRegion.getServerAddress(); -            InetAddress regionAddress = -              regionServerAddress.getInetSocketAddress().getAddress(); -            String regionLocation; -            try { -              regionLocation = reverseDNS(regionAddress); -            } catch (NamingException e) { -              LOG.error("Cannot resolve the host name for " + regionAddress + -                  " because of " + e); -              regionLocation = regionServerAddress.getHostname(); -            } -             -            byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); -            byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  -             -            LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",  -                (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), -                    (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), -                    rStop.length -                    )); +          List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -          HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), -              sStart, -              sStop, -              regionLocation, -              SourceMode.SCAN_RANGE -           ); +          int maxRegions = validRegions.size(); +          int currentRegion = 1; -          split.setEndRowInclusive( currentRegion == maxRegions ); - -          currentRegion ++; -                   -           LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  -               Bytes.toString(startRow), Bytes.toString(stopRow), -               Bytes.toString(rStart), Bytes.toString(rStop), -               Bytes.toString(sStart),  -               Bytes.toString(sStop),  -               cRegion.getHostnamePort(), split) ); +          for( HRegionLocation cRegion : validRegions ) { +            byte [] rStart = cRegion.getRegionInfo().getStartKey(); +            byte [] rStop = cRegion.getRegionInfo().getEndKey(); +             +            HServerAddress regionServerAddress = cRegion.getServerAddress(); +              InetAddress regionAddress = +                regionServerAddress.getInetSocketAddress().getAddress(); +              String regionLocation; +              try { +                regionLocation = reverseDNS(regionAddress); +              } catch (NamingException e) { +                LOG.error("Cannot resolve the host name for " + regionAddress + +                    " because of " + e); +                regionLocation = regionServerAddress.getHostname(); +              } +               +              byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); +              byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  +               +              LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",  +                  (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), +                      (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), +                      rStop.length +                      )); +             +            HBaseTableSplit split = new HBaseTableSplit( +                table.getTableName(), +                sStart, +                sStop, +                regionLocation, +                SourceMode.SCAN_RANGE, useSalt +             ); +             +            split.setEndRowInclusive( currentRegion == maxRegions ); +   +            currentRegion ++; +                     +             LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  +                 Bytes.toString(startRow), Bytes.toString(stopRow), +                 Bytes.toString(rStart), Bytes.toString(rStop), +                 Bytes.toString(sStart),  +                 Bytes.toString(sStop),  +                 cRegion.getHostnamePort(), split) ); +             +             splits.add(split); +          } +        } else { +          LOG.debug("Using SALT : " + useSalt ); -           splits.add(split); +          // Will return the start and the stop key with all possible prefixes. +          for( int i = 0; i < regions.length; i++ ) { +        	Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); +        	 +            for( Pair<byte[], byte[]> pair : intervals ) { +              LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); +               +              HBaseTableSplit split = new HBaseTableSplit( +                  table.getTableName(), +                  pair.getFirst(), +                  pair.getSecond(), +                  regions[i], +                  SourceMode.SCAN_RANGE, useSalt +               ); + +              split.setEndRowInclusive(true); +              splits.add(split); +            } +          }          } -// -//        for (int i = 0; i < keys.getFirst().length; i++) { -// -//          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -//            LOG.info("NOT including regions : " + regions[i]); -//            continue; -//          } -//           -//          // determine if the given start an stop key fall into the region -//          if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -//               Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -//              (stopRow.length == 0 || -//               Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { -//             -//            byte[] splitStart = startRow.length == 0 || -//              Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -//                keys.getFirst()[i] : startRow; -//            byte[] splitStop = (stopRow.length == 0 || -//              Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -//              keys.getSecond()[i].length > 0 ? -//                keys.getSecond()[i] : stopRow; -//                HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -//              splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); -//            splits.add(split); -//             -//            LOG.info("getSplits: split -> " + i + " -> " + split); -//          } -//        } -         -        LOG.info("RETURNED SPLITS: split -> " + splits); +        LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); +        for( HBaseTableSplit s: splits) { +            LOG.info("RETURNED SPLITS: split -> " + s); +        }          return splits.toArray(new HBaseTableSplit[splits.size()]);        }         case GET_LIST:        { -        if( keyList == null || keyList.size() == 0 ) { +//        if( keyList == null || keyList.size() == 0 ) { +        if( keyList == null ) {            throw new IOException("Source Mode is GET_LIST but key list is EMPTY");          }  +        if( useSalt ) { +          TreeSet<String> tempKeyList = new TreeSet<String>(); +           +          for(String key: keyList) { +            tempKeyList.add(HBaseSalter.addSaltPrefix(key)); +          } +           +          keyList = tempKeyList; +        } +         +        LOG.debug("".format("Splitting Key List (%s)", keyList)); +                  List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();          for (int i = 0; i < keys.getFirst().length; i++) { @@ -280,44 +294,46 @@ public class HBaseInputFormat              continue;            } -          LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); +          LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));            Set<String> regionsSubSet = null;            if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { -            LOG.info("REGION start is empty"); -            LOG.info("REGION stop is empty"); +            LOG.debug("REGION start is empty"); +            LOG.debug("REGION stop is empty");              regionsSubSet = keyList;            } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { -            LOG.info("REGION start is empty"); +            LOG.debug("REGION start is empty");              regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);            } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { -            LOG.info("REGION stop is empty"); +            LOG.debug("REGION stop is empty");              regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);            } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {              regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);            } else { -            throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",  +            throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",                   regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));            }            if( regionsSubSet == null || regionsSubSet.size() == 0) { -            LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); +            LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null");              continue;            }            TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); -          LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); +          LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));            HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), regionKeyList, +              table.getTableName(), regionKeyList, versions,                regions[i],  -              SourceMode.GET_LIST); +              SourceMode.GET_LIST, useSalt);            splits.add(split);          } +        LOG.debug("RETURNED SPLITS: split -> " + splits); +          return splits.toArray(new HBaseTableSplit[splits.size()]);        }  @@ -351,20 +367,23 @@ public class HBaseInputFormat        case SCAN_ALL:        case SCAN_RANGE:        { -        LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); +        LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));          trr.setStartRow(tSplit.getStartRow());          trr.setEndRow(tSplit.getEndRow());          trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +        trr.setUseSalt(useSalt);        }        break;        case GET_LIST:        { -        LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); +        LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));          trr.setKeyList(tSplit.getKeyList()); +        trr.setVersions(tSplit.getVersions()); +        trr.setUseSalt(useSalt);        }        break; @@ -402,6 +421,9 @@ public class HBaseInputFormat    private SourceMode sourceMode = SourceMode.EMPTY;    private TreeSet<String> keyList = null; +  private int versions = 1; +  private boolean useSalt = false; +  private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;    public void configure(JobConf job) {      String tableName = getTableName(job); @@ -422,9 +444,12 @@ public class HBaseInputFormat      LOG.debug("Entered : " + this.getClass() + " : configure()" ); +    useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); +    prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); +          sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; -    LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",  +    LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",           String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));      switch( sourceMode ) { @@ -442,16 +467,18 @@ public class HBaseInputFormat          Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));          keyList = new TreeSet<String> (keys); + +        versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); -        LOG.info( "GOT KEY LIST : " + keys ); -        LOG.info(String.format("SETTING key list (%s)", keyList) ); +        LOG.debug( "GOT KEY LIST : " + keys ); +        LOG.debug(String.format("SETTING key list (%s)", keyList) );          break;        case EMPTY:          LOG.info("HIT EMPTY"); -        sourceMode = sourceMode.SCAN_ALL; +        sourceMode = SourceMode.SCAN_ALL;          break;        default: @@ -468,7 +495,7 @@ public class HBaseInputFormat      if (tableName == null) {        throw new IOException("expecting one table name");      } -    LOG.debug("".format("Found Table name [%s]", tableName)); +    LOG.debug(String.format("Found Table name [%s]", tableName));      // connected to table? @@ -476,16 +503,16 @@ public class HBaseInputFormat        throw new IOException("could not connect to table '" +          tableName + "'");      } -    LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); +    LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));      // expecting at least one column      String colArg = job.get(COLUMN_LIST);      if (colArg == null || colArg.length() == 0) {        throw new IOException("expecting at least one column");      } -    LOG.debug("".format("Found Columns [%s]", colArg)); +    LOG.debug(String.format("Found Columns [%s]", colArg)); -    LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); +    LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey));      if( sourceMode == SourceMode.EMPTY ) {        throw new IOException("SourceMode should not be EMPTY"); @@ -504,7 +531,7 @@ public class HBaseInputFormat    private void setJobProp( JobConf job, String key, String value) { -    if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); +    if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));      job.set(key,  value);    } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java new file mode 100644 index 0000000..40f1faf --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -0,0 +1,59 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +public class HBaseOutputFormat extends +FileOutputFormat<ImmutableBytesWritable, Put> { + +  /** JobConf parameter that specifies the output table */ +  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; +  private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); + + +  @Override +  @SuppressWarnings("unchecked") +  public RecordWriter getRecordWriter(FileSystem ignored, +      JobConf job, String name, Progressable progress) throws IOException { + +    // expecting exactly one path + +    String tableName = job.get(OUTPUT_TABLE); +    HTable table = null; +    try { +      table = new HTable(HBaseConfiguration.create(job), tableName); +    } catch(IOException e) { +      LOG.error(e); +      throw e; +    } +    table.setAutoFlush(false); +    return new HBaseRecordWriter(table); +  } + +  @Override +  public void checkOutputSpecs(FileSystem ignored, JobConf job) +  throws FileAlreadyExistsException, InvalidJobConfException, IOException { + +    String tableName = job.get(OUTPUT_TABLE); +    if(tableName == null) { +      throw new IOException("Must specify table name"); +    } +  } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java index 97077c4..d22ed71 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -3,12 +3,17 @@ package parallelai.spyglass.hbase;  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;  import java.io.IOException; -import java.util.Set; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map;  import java.util.TreeSet; +import java.util.Vector;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue;  import org.apache.hadoop.hbase.client.Get;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result; @@ -18,36 +23,38 @@ import org.apache.hadoop.hbase.client.ScannerCallable;  import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes;   import org.apache.hadoop.hbase.util.Writables;  import org.apache.hadoop.mapred.RecordReader;  import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { +public class HBaseRecordReader implements +    RecordReader<ImmutableBytesWritable, Result> {    static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); -  private byte [] startRow; -  private byte [] endRow; -  private byte [] lastSuccessfulRow; +  private byte[] startRow; +  private byte[] endRow; +  private byte[] lastSuccessfulRow;    private TreeSet<String> keyList;    private SourceMode sourceMode;    private Filter trrRowFilter;    private ResultScanner scanner;    private HTable htable; -  private byte [][] trrInputColumns; +  private byte[][] trrInputColumns;    private long timestamp;    private int rowcount;    private boolean logScannerActivity = false;    private int logPerRowCount = 100;    private boolean endRowInclusive = true; +  private int versions = 1; +  private boolean useSalt = false;    /**     * Restart from survivable exceptions by creating a new scanner. -   * +   *      * @param firstRow     * @throws IOException     */ @@ -55,26 +62,26 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R      Scan currentScan;      if ((endRow != null) && (endRow.length > 0)) {        if (trrRowFilter != null) { -        Scan scan = new Scan(firstRow, (endRowInclusive ?   -            Bytes.add(endRow, new byte[] {0}) : endRow ) ); -         +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +            new byte[] { 0 }) : endRow)); +          TableInputFormat.addColumns(scan, trrInputColumns);          scan.setFilter(trrRowFilter);          scan.setCacheBlocks(false);          this.scanner = this.htable.getScanner(scan);          currentScan = scan;        } else { -        LOG.debug("TIFB.restart, firstRow: " + -            Bytes.toString(firstRow) + ", endRow: " + -            Bytes.toString(endRow)); -        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); +        LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) +            + ", endRow: " + Bytes.toString(endRow)); +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +            new byte[] { 0 }) : endRow));          TableInputFormat.addColumns(scan, trrInputColumns);          this.scanner = this.htable.getScanner(scan);          currentScan = scan;        }      } else { -      LOG.debug("TIFB.restart, firstRow: " + -          Bytes.toStringBinary(firstRow) + ", no endRow"); +      LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) +          + ", no endRow");        Scan scan = new Scan(firstRow);        TableInputFormat.addColumns(scan, trrInputColumns); @@ -83,7 +90,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R        currentScan = scan;      }      if (logScannerActivity) { -      LOG.info("Current scan=" + currentScan.toString()); +      LOG.debug("Current scan=" + currentScan.toString());        timestamp = System.currentTimeMillis();        rowcount = 0;      } @@ -97,6 +104,14 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R      this.keyList = keyList;    } +  public void setVersions(int versions) { +    this.versions = versions; +  } +   +  public void setUseSalt(boolean useSalt) { +    this.useSalt = useSalt; +  } +    public SourceMode getSourceMode() {      return sourceMode;    } @@ -108,76 +123,84 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    public byte[] getEndRow() {      return endRow;    } -   +    public void setEndRowInclusive(boolean isInclusive) {      endRowInclusive = isInclusive;    } -   +    public boolean getEndRowInclusive() {      return endRowInclusive;    } -   -  private byte [] nextKey = null; + +  private byte[] nextKey = null; +  private Vector<List<KeyValue>> resultVector = null; +  Map<Long, List<KeyValue>> keyValueMap = null;    /**     * Build the scanner. Not done in constructor to allow for extension. -   * +   *      * @throws IOException     */    public void init() throws IOException { -    switch( sourceMode ) { -      case SCAN_ALL: -      case SCAN_RANGE: -        restartRangeScan(startRow); -        break; -         -      case GET_LIST: -        nextKey = Bytes.toBytes(keyList.pollFirst()); -        break; -         -      default: -        throw new IOException(" Unknown source mode : " + sourceMode ); +    switch (sourceMode) { +    case SCAN_ALL: +    case SCAN_RANGE: +      restartRangeScan(startRow); +      break; + +    case GET_LIST: +      nextKey = Bytes.toBytes(keyList.pollFirst()); +      break; + +    default: +      throw new IOException(" Unknown source mode : " + sourceMode);      }    }    byte[] getStartRow() {      return this.startRow;    } +    /** -   * @param htable the {@link HTable} to scan. +   * @param htable +   *          the {@link HTable} to scan.     */    public void setHTable(HTable htable) {      Configuration conf = htable.getConfiguration(); -    logScannerActivity = conf.getBoolean( -      ScannerCallable.LOG_SCANNER_ACTIVITY, false); +    logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, +        false);      logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);      this.htable = htable;    }    /** -   * @param inputColumns the columns to be placed in {@link Result}. +   * @param inputColumns +   *          the columns to be placed in {@link Result}.     */ -  public void setInputColumns(final byte [][] inputColumns) { +  public void setInputColumns(final byte[][] inputColumns) {      this.trrInputColumns = inputColumns;    }    /** -   * @param startRow the first row in the split +   * @param startRow +   *          the first row in the split     */ -  public void setStartRow(final byte [] startRow) { +  public void setStartRow(final byte[] startRow) {      this.startRow = startRow;    }    /** -   * -   * @param endRow the last row in the split +   *  +   * @param endRow +   *          the last row in the split     */ -  public void setEndRow(final byte [] endRow) { +  public void setEndRow(final byte[] endRow) {      this.endRow = endRow;    }    /** -   * @param rowFilter the {@link Filter} to be used. +   * @param rowFilter +   *          the {@link Filter} to be used.     */    public void setRowFilter(Filter rowFilter) {      this.trrRowFilter = rowFilter; @@ -185,12 +208,13 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    @Override    public void close() { -	  if (this.scanner != null) this.scanner.close(); +    if (this.scanner != null) +      this.scanner.close();    }    /**     * @return ImmutableBytesWritable -   * +   *      * @see org.apache.hadoop.mapred.RecordReader#createKey()     */    @Override @@ -200,7 +224,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    /**     * @return RowResult -   * +   *      * @see org.apache.hadoop.mapred.RecordReader#createValue()     */    @Override @@ -222,104 +246,260 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    }    /** -   * @param key HStoreKey as input key. -   * @param value MapWritable as input value +   * @param key +   *          HStoreKey as input key. +   * @param value +   *          MapWritable as input value     * @return true if there was more data     * @throws IOException     */    @Override    public boolean next(ImmutableBytesWritable key, Result value) -  throws IOException { -     -    switch(sourceMode) { -      case SCAN_ALL: -      case SCAN_RANGE: -      { -         -        Result result; +      throws IOException { + +    switch (sourceMode) { +    case SCAN_ALL: +    case SCAN_RANGE: { + +      Result result; +      try {          try { -          try { -            result = this.scanner.next(); -            if (logScannerActivity) { -              rowcount ++; -              if (rowcount >= logPerRowCount) { -                long now = System.currentTimeMillis(); -                LOG.info("Mapper took " + (now-timestamp) -                  + "ms to process " + rowcount + " rows"); -                timestamp = now; -                rowcount = 0; -              } -            } -          } catch (IOException e) { -            // try to handle all IOExceptions by restarting -            // the scanner, if the second call fails, it will be rethrown -            LOG.debug("recovered from " + StringUtils.stringifyException(e)); -            if (lastSuccessfulRow == null) { -              LOG.warn("We are restarting the first next() invocation," + -                  " if your mapper has restarted a few other times like this" + -                  " then you should consider killing this job and investigate" + -                  " why it's taking so long."); -            } -            if (lastSuccessfulRow == null) { -              restartRangeScan(startRow); -            } else { -              restartRangeScan(lastSuccessfulRow); -              this.scanner.next();    // skip presumed already mapped row +          result = this.scanner.next(); +          if (logScannerActivity) { +            rowcount++; +            if (rowcount >= logPerRowCount) { +              long now = System.currentTimeMillis(); +              LOG.debug("Mapper took " + (now - timestamp) + "ms to process " +                  + rowcount + " rows"); +              timestamp = now; +              rowcount = 0;              } -            result = this.scanner.next();            } +        } catch (IOException e) { +          // try to handle all IOExceptions by restarting +          // the scanner, if the second call fails, it will be rethrown +          LOG.debug("recovered from " + StringUtils.stringifyException(e)); +          if (lastSuccessfulRow == null) { +            LOG.warn("We are restarting the first next() invocation," +                + " if your mapper has restarted a few other times like this" +                + " then you should consider killing this job and investigate" +                + " why it's taking so long."); +          } +          if (lastSuccessfulRow == null) { +            restartRangeScan(startRow); +          } else { +            restartRangeScan(lastSuccessfulRow); +            this.scanner.next(); // skip presumed already mapped row +          } +          result = this.scanner.next(); +        } -          if (result != null && result.size() > 0) { +        if (result != null && result.size() > 0) { +          if( useSalt) { +            key.set( HBaseSalter.delSaltPrefix(result.getRow())); +          } else {              key.set(result.getRow()); -            lastSuccessfulRow = key.get(); -            Writables.copyWritable(result, value); -            return true; -          } -          return false; -        } catch (IOException ioe) { -          if (logScannerActivity) { -            long now = System.currentTimeMillis(); -            LOG.info("Mapper took " + (now-timestamp) -              + "ms to process " + rowcount + " rows"); -            LOG.info(ioe); -            String lastRow = lastSuccessfulRow == null ? -              "null" : Bytes.toStringBinary(lastSuccessfulRow); -            LOG.info("lastSuccessfulRow=" + lastRow);            } -          throw ioe; +           +          lastSuccessfulRow = key.get(); +          Writables.copyWritable(result, value); +          return true; +        } +        return false; +      } catch (IOException ioe) { +        if (logScannerActivity) { +          long now = System.currentTimeMillis(); +          LOG.debug("Mapper took " + (now - timestamp) + "ms to process " +              + rowcount + " rows"); +          LOG.debug(ioe); +          String lastRow = lastSuccessfulRow == null ? "null" : Bytes +              .toStringBinary(lastSuccessfulRow); +          LOG.debug("lastSuccessfulRow=" + lastRow);          } +        throw ioe;        } +    } + +    case GET_LIST: { +      LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); -      case GET_LIST: -      { -        Result result; -        if( nextKey != null ) { -          result = this.htable.get(new Get(nextKey)); +      if (versions == 1) { +        if (nextKey != null) { +          LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); -          if (result != null && result.size() > 0) { -        	System.out.println("KeyList => " + keyList); -        	System.out.println("Result => " + result); -        	if (keyList != null || !keyList.isEmpty()) { -        		 -        		String newKey = keyList.pollFirst(); -        		System.out.println("New Key => " + newKey); -        		nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); -        	} else { -        		nextKey = null; -        	} -            key.set(result.getRow()); +          Get theGet = new Get(nextKey); +          theGet.setMaxVersions(versions); + +          Result result = this.htable.get(theGet); + +          if (result != null && (! result.isEmpty()) ) { +            LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + +            if (keyList != null || !keyList.isEmpty()) { +              String newKey = keyList.pollFirst(); +              LOG.debug("New Key => " + newKey); +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +            } else { +              nextKey = null; +            } +             +            LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); +             +            // Write the result +            if( useSalt) { +              key.set( HBaseSalter.delSaltPrefix(result.getRow())); +            } else { +              key.set(result.getRow()); +            }              lastSuccessfulRow = key.get();              Writables.copyWritable(result, value); +                          return true; +          } else { +            LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 +             +            String newKey; +            while((newKey = keyList.pollFirst()) != null) { +              LOG.debug("WHILE NEXT Key => " + newKey); + +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +               +              if( nextKey == null ) { +                LOG.error("BOMB! BOMB! BOMB!"); +                continue;  +              } +               +              if( ! this.htable.exists( new Get(nextKey) ) ) { +                LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); +                continue;  +              } else { break; } +            } +             +            nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                .toBytes(newKey); +             +            LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + +            return next(key, value);            } -          return false;          } else { +          // Nothig left. return false            return false;          } +      } else { +        if (resultVector != null && resultVector.size() != 0) {  +          LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + +          List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); +          Result result = new Result(resultKeyValue); + +          LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + +          if( useSalt) { +            key.set( HBaseSalter.delSaltPrefix(result.getRow())); +          } else { +            key.set(result.getRow()); +          } +          lastSuccessfulRow = key.get(); +          Writables.copyWritable(result, value); + +          return true; +        } else { +          if (nextKey != null) { +            LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); +             +            Get theGet = new Get(nextKey); +            theGet.setMaxVersions(versions); + +            Result resultAll = this.htable.get(theGet); +             +            if( resultAll != null && (! resultAll.isEmpty())) { +              List<KeyValue> keyValeList = resultAll.list(); + +              keyValueMap = new HashMap<Long, List<KeyValue>>(); +               +              LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + +              for (KeyValue keyValue : keyValeList) { +                long version = keyValue.getTimestamp(); + +                if (keyValueMap.containsKey(new Long(version))) { +                  List<KeyValue> keyValueTempList = keyValueMap.get(new Long( +                      version)); +                  if (keyValueTempList == null) { +                    keyValueTempList = new ArrayList<KeyValue>(); +                  } +                  keyValueTempList.add(keyValue); +                } else { +                  List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); +                  keyValueMap.put(new Long(version), keyValueTempList); +                  keyValueTempList.add(keyValue); +                } +              } + +              resultVector = new Vector<List<KeyValue>>(); +              resultVector.addAll(keyValueMap.values()); + +              List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + +              Result result = new Result(resultKeyValue); + +              LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + +              String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + +              System.out.println("+ New Key => " + newKey); +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); + +              if( useSalt) { +                key.set( HBaseSalter.delSaltPrefix(result.getRow())); +              } else { +                key.set(result.getRow()); +              } +              lastSuccessfulRow = key.get(); +              Writables.copyWritable(result, value); +              return true; +            } else { +              LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 +               +              String newKey; +               +              while( (newKey = keyList.pollFirst()) != null ) { +                LOG.debug("+ WHILE NEXT Key => " + newKey); + +                nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                    .toBytes(newKey); +                 +                if( nextKey == null ) { +                  LOG.error("+ BOMB! BOMB! BOMB!"); +                  continue;  +                } +                 +                if( ! this.htable.exists( new Get(nextKey) ) ) { +                  LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); +                  continue;  +                } else { break; } +              } +               +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +               +              LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + +              return next(key, value); +            } +             +          } else { +            return false; +          } +        }        } -       -      default: -        throw new IOException("Unknown source mode : " + sourceMode ); -    }  +    } +    default: +      throw new IOException("Unknown source mode : " + sourceMode); +    }    }  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java new file mode 100644 index 0000000..1cca133 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -0,0 +1,37 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; + +/** + * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) + * and write to an HBase table + */ +public class HBaseRecordWriter +  implements RecordWriter<ImmutableBytesWritable, Put> { +  private HTable m_table; + +  /** +   * Instantiate a TableRecordWriter with the HBase HClient for writing. +   * +   * @param table +   */ +  public HBaseRecordWriter(HTable table) { +    m_table = table; +  } + +  public void close(Reporter reporter) +    throws IOException { +    m_table.close(); +  } + +  public void write(ImmutableBytesWritable key, +      Put value) throws IOException { +    m_table.put(new Put(value)); +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java new file mode 100644 index 0000000..fc656a4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -0,0 +1,206 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +public class HBaseSalter { +  private static final Log LOG = LogFactory.getLog(HBaseSalter.class); + +   +  public static byte [] addSaltPrefix(byte [] key) throws IOException { +    if( key == null || key.length < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long"); +     +    String keyStr = Bytes.toString(key); +     +    return Bytes.toBytes(keyStr.substring(keyStr.length() - 1) + "_" + keyStr);  +  } + +  public static String addSaltPrefix(String key) throws IOException { +    if( key == null || key.length() < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long"); +     +    return (key.substring(key.length() - 1) + "_" + key);  +  } +   +  public static ImmutableBytesWritable addSaltPrefix( ImmutableBytesWritable key ) throws IOException { +	  return new ImmutableBytesWritable( addSaltPrefix(key.get())); +  } + +  public static byte [] delSaltPrefix(byte [] key) throws IOException { +    if( key == null || key.length < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long"); +     +    String keyStr = Bytes.toString(key); +     +    return Bytes.toBytes(keyStr.substring(2));  +  } + +  public static String delSaltPrefix(String key) throws IOException { +    if( key == null || key.length() < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long"); +     +    return key.substring(2);  +  } +   +  public static ImmutableBytesWritable delSaltPrefix( ImmutableBytesWritable key ) throws IOException { +	  return new ImmutableBytesWritable( delSaltPrefix(key.get())); +  } +   +  public static final String DEFAULT_PREFIX_LIST = " !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~"; +   +  public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey) throws IOException { +	  return getDistributedIntervals(originalStartKey, originalStopKey, DEFAULT_PREFIX_LIST); +  } +   +  public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey, String prefixList) throws IOException { +	  return getDistributedIntervals(originalStartKey, originalStopKey, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, prefixList); +  } +   + +  public static Pair<byte[], byte[]>[] getDistributedIntervals( +		  byte[] originalStartKey, byte[] originalStopKey,  +		  byte[] regionStartKey, byte[] regionStopKey,  +		  String prefixList) throws IOException { +	LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", +			Bytes.toString(originalStartKey), +			Bytes.toString(originalStopKey), +			Bytes.toString(regionStartKey), +			Bytes.toString(regionStopKey), +			prefixList +			));  +	  +    byte[][] startKeys; +    byte[][] stopKeys; +     +    if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW) +    		&& Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW) ) { +    	startKeys = getAllKeys(originalStartKey, prefixList); +    	stopKeys = getAllKeys(originalStopKey, prefixList); +    } else if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW)) { +    	startKeys = getAllKeysWithStop(originalStartKey, prefixList, regionStopKey[0]); +    	stopKeys = getAllKeysWithStop(originalStopKey, prefixList, regionStopKey[0]); +    } else if(Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW)) { +    	startKeys = getAllKeysWithStart(originalStartKey, prefixList, regionStartKey[0]); +    	stopKeys = getAllKeysWithStart(originalStopKey, prefixList, regionStartKey[0]); +    } else { +    	startKeys = getAllKeysInRange(originalStartKey, prefixList, regionStartKey[0], regionStopKey[0]); +    	stopKeys = getAllKeysInRange(originalStopKey, prefixList, regionStartKey[0], regionStopKey[0]); +    } +     +    if( startKeys.length != stopKeys.length) { +    	throw new IOException("LENGTH of START Keys and STOP Keys DO NOT match"); +    } +     +    if( Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)  +    		&& Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW) ) { +        Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR); +        // stop keys are the start key of the next interval +        for (int i = startKeys.length - 1; i >= 1; i--) { +        	startKeys[i] = startKeys[i - 1]; +        } +        startKeys[0] = HConstants.EMPTY_START_ROW; +        stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW; +    } else if (Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)) { +        Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR); +        // stop keys are the start key of the next interval +        for (int i = startKeys.length - 1; i >= 1; i--) { +        	startKeys[i] = startKeys[i - 1]; +        } +        startKeys[0] = HConstants.EMPTY_START_ROW; +    } else if (Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW)) { +        Arrays.sort(startKeys, Bytes.BYTES_RAWCOMPARATOR); +        // stop keys are the start key of the next interval +        for (int i = 0; i < stopKeys.length - 1; i++) { +          stopKeys[i] = stopKeys[i + 1]; +        } +        stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW; +    }  +       +    Pair<byte[], byte[]>[] intervals = new Pair[startKeys.length]; +    for (int i = 0; i < startKeys.length; i++) { +      intervals[i] = new Pair<byte[], byte[]>(startKeys[i], stopKeys[i]); +    } + +    return intervals; +  }   +   +  public static byte[][] getAllKeys(byte[] originalKey) throws IOException { +    return getAllKeys(originalKey, DEFAULT_PREFIX_LIST); +  } + +  public static byte[][] getAllKeys(byte[] originalKey, String prefixList) throws IOException { +	  char[] prefixArray = prefixList.toCharArray(); +	   +	  return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)prefixArray[prefixArray.length - 1]); +  } + +  public static byte[][] getAllKeysWithStart(byte[] originalKey, String prefixList, byte startKey) throws IOException { +	  char[] prefixArray = prefixList.toCharArray(); +	   +	  return getAllKeysWithStartStop(originalKey, prefixList, startKey, (byte)prefixArray[prefixArray.length - 1]); +  } + +  public static byte[][] getAllKeysWithStop(byte[] originalKey, String prefixList, byte stopKey) throws IOException { +	  char[] prefixArray = prefixList.toCharArray(); +	   +	  return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); +  } + +  public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { +	  return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); +  } +   +  private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { +	  char[] prefixArray = prefixList.toCharArray(); +	  TreeSet<Byte> prefixSet = new TreeSet<Byte>(); +	   +	  for( char c : prefixArray ) { +		  prefixSet.add((byte)c); +	  } +	   +	  SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); +	   +	  return getAllKeys(originalKey, subSet.toArray(new Byte[]{})); +  } +   +  public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { +	byte[][] keys = new byte[prefixArray.length][]; +	 +    for (byte i = 0; i < prefixArray.length; i++) { +      keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey)); +    } + +    return keys; +  } +   +  public static void main(String [] args) throws IOException { +	   +   String s = ""; +    +   for (byte i = 32; i < Byte.MAX_VALUE; i++) { +	 s += (char)i; +   } +    +   System.out.println("".format("[%s]", s)); +   System.out.println("".format("[%s]", DEFAULT_PREFIX_LIST)); + +	   +    String start = Bytes.toString(HConstants.EMPTY_START_ROW); +    String stop = Bytes.toString(HConstants.EMPTY_END_ROW); +     +    System.out.println("".format("START: (%s) STOP: (%s)", start, stop)); +     +    Pair<byte[], byte[]>[] intervals = getDistributedIntervals(Bytes.toBytes(start), Bytes.toBytes(stop), "1423"); +     +    for( Pair<byte[], byte[]> p : intervals ) { +      System.out.println("".format("iSTART: (%s) iSTOP: (%s)", Bytes.toString(p.getFirst()), Bytes.toString(p.getSecond()))); +    } +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index e5acc30..a7d36fd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -24,7 +24,6 @@ import cascading.util.Util;  import org.apache.hadoop.hbase.client.Put;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.OutputCollector; @@ -63,7 +62,9 @@ public class HBaseScheme    /** String columns */    private transient String[] columns;    /** Field fields */ -  private transient byte[][] fields; +//  private transient byte[][] fields; +   +  private boolean useSalt = false;    /** @@ -237,8 +238,13 @@ public class HBaseScheme      OutputCollector outputCollector = sinkCall.getOutput();      Tuple key = tupleEntry.selectTuple(keyField);      ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); +     +    if( useSalt ) { +    	keyBytes = HBaseSalter.addSaltPrefix(keyBytes); +    } +          Put put = new Put(keyBytes.get(), this.timeStamp); - +          for (int i = 0; i < valueFields.length; i++) {        Fields fieldSelector = valueFields[i];        TupleEntry values = tupleEntry.selectEntry(fieldSelector); @@ -258,10 +264,13 @@ public class HBaseScheme    @Override    public void sinkConfInit(FlowProcess<JobConf> process,        Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -    conf.setOutputFormat(TableOutputFormat.class); +    conf.setOutputFormat(HBaseOutputFormat.class);      conf.setOutputKeyClass(ImmutableBytesWritable.class);      conf.setOutputValueClass(Put.class); +     +    String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE); +    useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false);    }    @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java index 1d48e1d..a5c3bdd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -1,11 +1,9 @@  package parallelai.spyglass.hbase; -import java.awt.event.KeyListener;  import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException;  import java.io.Serializable; -import java.util.Set;  import java.util.TreeSet;  import org.apache.commons.logging.Log; @@ -14,8 +12,9 @@ import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.InputSplit; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import com.sun.tools.javac.resources.version; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { @@ -28,11 +27,13 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,    private TreeSet<String> m_keyList = null;    private SourceMode m_sourceMode = SourceMode.EMPTY;    private boolean m_endRowInclusive = true; +  private int m_versions = 1; +  private boolean m_useSalt = false;    /** default constructor */    public HBaseTableSplit() {      this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, -      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); +      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);    }    /** @@ -43,19 +44,22 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,     * @param location     */    public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, -      final String location, final SourceMode sourceMode) { +      final String location, final SourceMode sourceMode, final boolean useSalt) {      this.m_tableName = tableName;      this.m_startRow = startRow;      this.m_endRow = endRow;      this.m_regionLocation = location;      this.m_sourceMode = sourceMode; +    this.m_useSalt = useSalt;    } -  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { +  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) {      this.m_tableName = tableName;      this.m_keyList = keyList; +    this.m_versions = versions;      this.m_sourceMode = sourceMode;      this.m_regionLocation = location; +    this.m_useSalt = useSalt;    }    /** @return table name */ @@ -86,20 +90,28 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,      return m_keyList;    } +  public int getVersions() { +	  return m_versions; +  } +    /** @return get the source mode */    public SourceMode getSourceMode() {      return m_sourceMode;    } +   +  public boolean getUseSalt() { +    return m_useSalt; +  }    /** @return the region's hostname */    public String getRegionLocation() { -    LOG.info("REGION GETTER : " + m_regionLocation); +    LOG.debug("REGION GETTER : " + m_regionLocation);      return this.m_regionLocation;    }    public String[] getLocations() { -    LOG.info("REGION ARRAY : " + m_regionLocation); +    LOG.debug("REGION ARRAY : " + m_regionLocation);      return new String[] {this.m_regionLocation};    } @@ -112,11 +124,12 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,    @Override    public void readFields(DataInput in) throws IOException { -    LOG.info("READ ME : " + in.toString()); +    LOG.debug("READ ME : " + in.toString());      this.m_tableName = Bytes.readByteArray(in);      this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));      this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); +    this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));      switch(this.m_sourceMode) {        case SCAN_RANGE: @@ -126,6 +139,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;        case GET_LIST: +    	this.m_versions = Bytes.toInt(Bytes.readByteArray(in));          this.m_keyList = new TreeSet<String>();          int m = Bytes.toInt(Bytes.readByteArray(in)); @@ -136,16 +150,17 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;      } -    LOG.info("READ and CREATED : " + this); +    LOG.debug("READ and CREATED : " + this);    }    @Override    public void write(DataOutput out) throws IOException { -    LOG.info("WRITE : " + this); +    LOG.debug("WRITE : " + this);      Bytes.writeByteArray(out, this.m_tableName);      Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));      Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +    Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));      switch( this.m_sourceMode ) {        case SCAN_RANGE: @@ -155,6 +170,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;        case GET_LIST: +    	Bytes.writeByteArray(out, Bytes.toBytes(m_versions));          Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));          for( String k: this.m_keyList ) { @@ -163,13 +179,14 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;      } -    LOG.info("WROTE : " + out.toString()); +    LOG.debug("WROTE : " + out.toString());    }    @Override    public String toString() { -    return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)",  -        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); +    return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",  +        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),  +        (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt);    }    @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 9a0ed0e..65873a0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,7 @@  package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  import cascading.flow.FlowProcess;  import cascading.tap.SinkMode;  import cascading.tap.Tap; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;  import org.apache.hadoop.hbase.MasterNotRunningException;  import org.apache.hadoop.hbase.ZooKeeperConnectionException;  import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  import org.apache.hadoop.mapred.FileInputFormat;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.OutputCollector; @@ -35,13 +35,10 @@ import org.apache.hadoop.mapred.RecordReader;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; -  import java.io.IOException;  import java.io.Serializable;  import java.util.ArrayList; -import java.util.Map.Entry; +import java.util.Arrays;  import java.util.UUID;  /** @@ -172,7 +169,12 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      } -    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +    conf.set(HBaseOutputFormat.OUTPUT_TABLE, tableName); + +    for( SinkConfig sc : sinkConfigList) { +        sc.configure(conf); +    } +          super.sinkConfInit(process, conf);    } @@ -292,10 +294,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      public String startKey = null;      public String stopKey = null;      public String [] keyList = null; +    public int versions = 1; +    public boolean useSalt = false; +    public String prefixList = null;      public void configure(Configuration jobConf) {        switch( sourceMode ) {          case SCAN_RANGE: +	        if (stopKey != null && startKey != null && startKey.compareTo(stopKey) > 0) { +	      	  String t = stopKey; +	      	  stopKey = startKey; +	      	  startKey = t; +	        } +                        jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());            if( startKey != null && startKey.length() > 0 ) @@ -304,57 +315,104 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {            if( stopKey != null && stopKey.length() > 0 )              jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); -          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); -          LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); -          LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); +          // Added for Salting +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); +           +          LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info(String.format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); +          LOG.info(String.format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));            break;          case GET_LIST:            jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());            jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); +          jobConf.setInt(String.format(HBaseConstants.VERSIONS, tableName), versions); -          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); -          LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); +          // Added for Salting +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); +           +          LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info(String.format("Setting KEY LIST (%s) to key list length (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList.length));            break;          default:            jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); -          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          // Added for Salting +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); +           +          LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));            break;        }      }    } +  private static class SinkConfig implements Serializable { +	  public String tableName = null; +	  public boolean useSalt = false; +	   +	  public void configure(Configuration jobConf) { +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +	  } +  } +      private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>(); - -  public void setHBaseRangeParms(String startKey, String stopKey ) { +  private ArrayList<SinkConfig> sinkConfigList = new ArrayList<SinkConfig>(); +   +  public void setHBaseRangeParms(String startKey, String stopKey, boolean useSalt, String prefixList ) {      SourceConfig sc = new SourceConfig();      sc.sourceMode = SourceMode.SCAN_RANGE;      sc.tableName = tableName;      sc.startKey = startKey;      sc.stopKey = stopKey; +    sc.useSalt = useSalt; +    setPrefixList(sc, prefixList);      sourceConfigList.add(sc);    } -  public void setHBaseListParms(String [] keyList ) { +  public void setHBaseListParms(String [] keyList, int versions, boolean useSalt, String prefixList ) {      SourceConfig sc = new SourceConfig();      sc.sourceMode = SourceMode.GET_LIST;      sc.tableName = tableName;      sc.keyList = keyList; +    sc.versions = (versions < 1) ? 1 : versions; +    sc.useSalt = useSalt; +    setPrefixList(sc, prefixList);      sourceConfigList.add(sc);    } -  public void setHBaseScanAllParms() { +  public void setHBaseScanAllParms(boolean useSalt, String prefixList) {      SourceConfig sc = new SourceConfig();      sc.sourceMode = SourceMode.SCAN_ALL;      sc.tableName = tableName; +    sc.useSalt = useSalt; +     +    setPrefixList(sc, prefixList);      sourceConfigList.add(sc);    } +   +  public void setUseSaltInSink( boolean useSalt ) { +    SinkConfig sc = new SinkConfig(); +     +    sc.tableName = tableName; +    sc.useSalt = useSalt; +	 +    sinkConfigList.add(sc); +  } +   +  private void setPrefixList(SourceConfig sc, String prefixList ) { +	  prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList; +	  char[] prefixArray = prefixList.toCharArray(); +	  Arrays.sort(prefixArray); +	  sc.prefixList = new String( prefixArray ); +  }  } | 
