diff options
| author | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 | 
|---|---|---|
| committer | Chandan Rajah <chandan.rajah@gmail.com> | 2013-06-06 12:27:15 +0100 | 
| commit | 6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch) | |
| tree | 5254682e3c3440f7c6954b23519459107b8a445e | |
| parent | ea9c80374da846edf2a1634a42ccb932838ebd5b (diff) | |
| download | SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip | |
Added extensions to Read and Write mode.
Added support for key prefixes
21 files changed, 2486 insertions, 363 deletions
| @@ -1,7 +1,9 @@  SpyGlass  ======== -Cascading and Scalding wrapper for HBase with advanced read features +Cascading and Scalding wrapper for HBase with advanced read and write features. + +Prevent Hot Spotting by the use of transparent key prefixes.  Building  ======== @@ -9,108 +11,193 @@ Building  	$ mvn clean install -U  	Requires Maven 3.x.x +	 +1. Read Mode Features +===================== -Example -======= +HBaseSource supports modes namely GET_LIST, SCAN_RANGE and SCAN_ALL. Use the *sourceMode* parameter to select the source mode. -	package parallelai.spyglass.hbase.example -	 -	import org.apache.hadoop.conf.Configuration -	import org.apache.hadoop.hbase.HBaseConfiguration -	import org.apache.hadoop.hbase.client.HConnectionManager -	import org.apache.hadoop.hbase.client.HTable -	import org.apache.hadoop.hbase.util.Bytes -	import org.apache.log4j.Level -	import org.apache.log4j.Logger -	 -	import com.twitter.scalding._ -	import com.twitter.scalding.Args -	 -	import parallelai.spyglass.base.JobBase -	import parallelai.spyglass.hbase.HBaseSource -	import parallelai.spyglass.hbase.HBaseConstants.SourceMode -	 -	class HBaseExample(args: Args) extends JobBase(args) { -	 -	  val isDebug: Boolean = args("debug").toBoolean -	 -	  if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) -	 -	  val output = args("output") -	 -	  println(output) -	 -	  val jobConf = getJobConf -	 -	  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" -	 -	  case class HBaseTableStore( -	      conf: Configuration, -	      quorum: String, -	      tableName: String) { -	 -	    val tableBytes = Bytes.toBytes(tableName) -	    val connection = HConnectionManager.getConnection(conf) -	    val maxThreads = conf.getInt("hbase.htable.threads.max", 1) -	 -	    conf.set("hbase.zookeeper.quorum", quorumNames); -	 -	    val htable = new HTable(HBaseConfiguration.create(conf), tableName) -	 -	  } -	 -	  val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") -	 -	  val hbs2 = new HBaseSource( +  - **GET_LIST** -> Provide a list of keys to retrieve from the HBase table +  - **SCAN_RANGE** -> Provide a start and stop key (inclusive) to get out of the HBase table. +  - **SCAN_ALL** -> Get all rows form the HBase Table +   +GET_LIST +-------- +Requires the *keyList* parameter to be specified as well. + +(e.g.) +	val hbs2 = new HBaseSource(  	    "table_name",  	    "quorum_name:2181",  	    'key,  	    Array("column_family"),  	    Array('column_name),  	    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) -	    .read -	    .write(Tsv(output.format("get_list"))) -	 -	  val hbs3 = new HBaseSource( -	    "table_name", -	    "quorum_name:2181", -	    'key, -	    Array("column_family"), -	    Array('column_name), -	    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") -	    .read -	    .write(Tsv(output.format("scan_all"))) -	 -	  val hbs4 = new HBaseSource( -	    "table_name", -	    "quorum_name:2181", -	    'key, -	    Array("column_family"), -	    Array('column_name), -	    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") -	    .read -	    .write(Tsv(output.format("scan_range_to_end"))) -	 -	  val hbs5 = new HBaseSource( +	     +	     +Additionally, the *versions* parameter can be used to retrieve more than one version of the row.  + +(e.g.) +	val hbs2 = new HBaseSource(  	    "table_name",  	    "quorum_name:2181",  	    'key,  	    Array("column_family"),  	    Array('column_name), -	    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") -	    .read -	    .write(Tsv(output.format("scan_range_from_start"))) -	 -	  val hbs6 = new HBaseSource( +	    sourceMode = SourceMode.GET_LIST, +	    versions = 5,  +	    keyList = List("5003914", "5000687", "5004897")) +	 +	     +SCAN_RANGE +---------- +Scan range uses the optional *startKey* and *stopKey* parameters to specify the range of keys to extract; both keys are inclusive.  + +if: + - Only *startKey* provided -> All rows from *startKey* till *END OF TABLE* are returned + - Only *stopKey* provided -> All rows from *START OF TABLE* till *stopKey* are returned + - Neither provided -> All rows in table are returned. +  +(e.g.) +	 val hbs4 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_to_end"))) + +  val hbs5 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_from_start"))) + +  val hbs6 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") +    .read +    .write(Tsv(output.format("scan_range_between"))) +  +  +SCAN_ALL +-------- +Returns all rows in the table + +(e.g.) +	val hbs2 = new HBaseSource(  	    "table_name",  	    "quorum_name:2181",  	    'key,  	    Array("column_family"),  	    Array('column_name), -	    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") -	    .read -	    .write(Tsv(output.format("scan_range_between"))) +	    sourceMode = SourceMode.SCAN_ALL) + + +2. Write Mode Features +====================== + +HBaseSource supports writing at a particular time stamp i.e. a version.  + +The time dimension can be added to the row by using the *timestamp* parameter. If the parameter is not present the current time is used. + +(e.g.)    +	.write(new HBaseSource( "table_name", +    	"quorum_name:2181", +    	'key,   +       TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +       TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +       timestamp = Platform.currentTime )) + +3. Region Hot Spot Prevention +============================= + +Region hot spotting is a common problem with HBase. Spy Glass uses key prefix salting to avoid this.  +The row key is prefixed with the last byte followed by a '_' (underscore) character. + +(e.g.) +Original Row Key   ->  Becomes +SPYGLASS           ->  S_SPYGLASS +12345678           ->  8_12345678 + +Conversion to and from salted keys is done automatically. + +Setting the *useSalt* parameter to *true* enables this functionality + + +(e.g.) +	  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + +	  val hbase07 =  +      new HBaseSource( "table_name", +    	"quorum_name:2181", 'key,   +          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = "0123456789" ) +	  .read +	   +	  // Convert from ImmutableBytesWritable to String  +	  .fromBytesWritable( TABLE_SCHEMA ) +	 +	  .write(TextLine("saltTesting/ScanRangePlusSalt10")) +	 +	  // Convert from String to ImmutableBytesWritable  +	  .toBytesWritable( TABLE_SCHEMA ) +	 +	  .write(new HBaseSource( "table_name", +	    	"quorum_name:2181", 'key,   +	          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +	          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +	          useSalt = true )) +	           +	 +Setting the *prefixList* parameter to the available prefixes can increase the read performance quite a bit.  + +4. Pipe Conversion Implicits +============================ + +HBaseSource will always read or write fields of type *ImmutableBytesWritable*. The supplied *HBasePipeConversions* trait is used to convert to and from *String* to *ImmutableBytesWritable* + +Add the trait to the job class and start using the conversions in the pipe directly + +(e.g.) +	class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { +	  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + +	  val hbase07 =  +      new HBaseSource( "table_name", +    	"quorum_name:2181", 'key,   +          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = "0123456789" ) +	  .read +	   +	  // Convert from ImmutableBytesWritable to String  +	  .fromBytesWritable( TABLE_SCHEMA ) +	 +	  .write(TextLine("saltTesting/ScanRangePlusSalt10")) +	 +	  // Convert from String to ImmutableBytesWritable  +	  .toBytesWritable( TABLE_SCHEMA ) +	 +	  .write(new HBaseSource( "table_name", +	    	"quorum_name:2181", 'key,   +	          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +	          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +	          useSalt = true ))  	}  + + @@ -12,7 +12,7 @@  	<name>Cascading and Scalding wrapper for HBase with advanced features</name>  	<groupId>parallelai</groupId>  	<artifactId>parallelai.spyglass</artifactId> -	<version>1.0.2</version> +	<version>2.0.3</version>  	<packaging>jar</packaging>  	<properties> @@ -158,7 +158,7 @@  			<artifactId>scalding_${scala.version}</artifactId>  			<version>${scalding.version}</version>  			<exclusions> -				<exclusion>  <!-- Declare exclusion, in order to use custom maple build --> +				<exclusion>  <!-- Declare exclusion, in order to use custom build -->  					<groupId>com.twitter</groupId>  					<artifactId>maple</artifactId>  				</exclusion> @@ -284,6 +284,39 @@  										<ignore></ignore>  									</action>  								</pluginExecution> +								<pluginExecution> +									<pluginExecutionFilter> +										<groupId></groupId> +										<artifactId></artifactId> +										<versionRange>[0.0,)</versionRange> +										<goals> +											<goal></goal> +										</goals> +									</pluginExecutionFilter> +									<action> +										<ignore></ignore> +									</action> +								</pluginExecution> +								<pluginExecution> +									<pluginExecutionFilter> +										<groupId> +											org.scala-tools +										</groupId> +										<artifactId> +											maven-scala-plugin +										</artifactId> +										<versionRange> +											[2.15.2,) +										</versionRange> +										<goals> +											<goal>compile</goal> +											<goal>testCompile</goal> +										</goals> +									</pluginExecutionFilter> +									<action> +										<ignore></ignore> +									</action> +								</pluginExecution>  							</pluginExecutions>  						</lifecycleMappingMetadata>  					</configuration> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index b546107..5baf20e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -15,5 +15,8 @@ public class HBaseConstants {    public static final String STOP_KEY = "hbase.%s.stopkey";     public static final String SOURCE_MODE = "hbase.%s.source.mode";    public static final String KEY_LIST = "hbase.%s.key.list"; +  public static final String VERSIONS = "hbase.%s.versions"; +  public static final String USE_SALT = "hbase.%s.use.salt"; +  public static final String SALT_PREFIX = "hbase.%s.salt.prefix";  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index f1f4fb7..aabdc5e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -5,8 +5,8 @@ import java.net.InetAddress;  import java.util.ArrayList;  import java.util.Collection;  import java.util.HashMap; -import java.util.HashSet;  import java.util.List; +import java.util.NavigableMap;  import java.util.Set;  import java.util.TreeSet;  import java.util.UUID; @@ -17,8 +17,10 @@ import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo;  import org.apache.hadoop.hbase.HRegionLocation;  import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.filter.Filter; @@ -38,7 +40,6 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -  public class HBaseInputFormat    implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -48,9 +49,9 @@ public class HBaseInputFormat    private byte [][] inputColumns;    private HTable table; -  private HBaseRecordReader tableRecordReader; +//  private HBaseRecordReader tableRecordReader;    private Filter rowFilter; -  private String tableName = ""; +//  private String tableName = "";    private HashMap<InetAddress, String> reverseDNSCacheMap =      new HashMap<InetAddress, String>(); @@ -83,7 +84,8 @@ public class HBaseInputFormat        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);        HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc -              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); +              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); +              splits.add(split);        return splits.toArray(new HBaseTableSplit[splits.size()]); @@ -100,7 +102,7 @@ public class HBaseInputFormat      byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];      byte[] maxKey = keys.getSecond()[0]; -    LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +    LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));      byte [][] regStartKeys = keys.getFirst();      byte [][] regStopKeys = keys.getSecond(); @@ -144,29 +146,29 @@ public class HBaseInputFormat        regions[i] = regionLocation; -      LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); +      LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));      }      byte[] startRow = HConstants.EMPTY_START_ROW;      byte[] stopRow = HConstants.EMPTY_END_ROW; -    LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +    LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); -    LOG.info("SOURCE MODE is : " + sourceMode);     +    LOG.debug("SOURCE MODE is : " + sourceMode);          switch( sourceMode ) {      case SCAN_ALL:        startRow = HConstants.EMPTY_START_ROW;        stopRow = HConstants.EMPTY_END_ROW; -      LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));        break;      case SCAN_RANGE:        startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;        stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; -      LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));        break;      } @@ -180,98 +182,110 @@ public class HBaseInputFormat          List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); -        List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -         -        int maxRegions = validRegions.size(); -        int currentRegion = 1; -         -        for( HRegionLocation cRegion : validRegions ) { -          byte [] rStart = cRegion.getRegionInfo().getStartKey(); -          byte [] rStop = cRegion.getRegionInfo().getEndKey(); +        if( ! useSalt ) { -          HServerAddress regionServerAddress = cRegion.getServerAddress(); -            InetAddress regionAddress = -              regionServerAddress.getInetSocketAddress().getAddress(); -            String regionLocation; -            try { -              regionLocation = reverseDNS(regionAddress); -            } catch (NamingException e) { -              LOG.error("Cannot resolve the host name for " + regionAddress + -                  " because of " + e); -              regionLocation = regionServerAddress.getHostname(); -            } -             -            byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); -            byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  -             -            LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",  -                (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), -                    (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), -                    rStop.length -                    )); +          List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -          HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), -              sStart, -              sStop, -              regionLocation, -              SourceMode.SCAN_RANGE -           ); +          int maxRegions = validRegions.size(); +          int currentRegion = 1; -          split.setEndRowInclusive( currentRegion == maxRegions ); - -          currentRegion ++; -                   -           LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  -               Bytes.toString(startRow), Bytes.toString(stopRow), -               Bytes.toString(rStart), Bytes.toString(rStop), -               Bytes.toString(sStart),  -               Bytes.toString(sStop),  -               cRegion.getHostnamePort(), split) ); +          for( HRegionLocation cRegion : validRegions ) { +            byte [] rStart = cRegion.getRegionInfo().getStartKey(); +            byte [] rStop = cRegion.getRegionInfo().getEndKey(); +             +            HServerAddress regionServerAddress = cRegion.getServerAddress(); +              InetAddress regionAddress = +                regionServerAddress.getInetSocketAddress().getAddress(); +              String regionLocation; +              try { +                regionLocation = reverseDNS(regionAddress); +              } catch (NamingException e) { +                LOG.error("Cannot resolve the host name for " + regionAddress + +                    " because of " + e); +                regionLocation = regionServerAddress.getHostname(); +              } +               +              byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); +              byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  +               +              LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",  +                  (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), +                      (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), +                      rStop.length +                      )); +             +            HBaseTableSplit split = new HBaseTableSplit( +                table.getTableName(), +                sStart, +                sStop, +                regionLocation, +                SourceMode.SCAN_RANGE, useSalt +             ); +             +            split.setEndRowInclusive( currentRegion == maxRegions ); +   +            currentRegion ++; +                     +             LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  +                 Bytes.toString(startRow), Bytes.toString(stopRow), +                 Bytes.toString(rStart), Bytes.toString(rStop), +                 Bytes.toString(sStart),  +                 Bytes.toString(sStop),  +                 cRegion.getHostnamePort(), split) ); +             +             splits.add(split); +          } +        } else { +          LOG.debug("Using SALT : " + useSalt ); -           splits.add(split); +          // Will return the start and the stop key with all possible prefixes. +          for( int i = 0; i < regions.length; i++ ) { +        	Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); +        	 +            for( Pair<byte[], byte[]> pair : intervals ) { +              LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); +               +              HBaseTableSplit split = new HBaseTableSplit( +                  table.getTableName(), +                  pair.getFirst(), +                  pair.getSecond(), +                  regions[i], +                  SourceMode.SCAN_RANGE, useSalt +               ); + +              split.setEndRowInclusive(true); +              splits.add(split); +            } +          }          } -// -//        for (int i = 0; i < keys.getFirst().length; i++) { -// -//          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -//            LOG.info("NOT including regions : " + regions[i]); -//            continue; -//          } -//           -//          // determine if the given start an stop key fall into the region -//          if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -//               Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -//              (stopRow.length == 0 || -//               Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { -//             -//            byte[] splitStart = startRow.length == 0 || -//              Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -//                keys.getFirst()[i] : startRow; -//            byte[] splitStop = (stopRow.length == 0 || -//              Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -//              keys.getSecond()[i].length > 0 ? -//                keys.getSecond()[i] : stopRow; -//                HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -//              splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); -//            splits.add(split); -//             -//            LOG.info("getSplits: split -> " + i + " -> " + split); -//          } -//        } -         -        LOG.info("RETURNED SPLITS: split -> " + splits); +        LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); +        for( HBaseTableSplit s: splits) { +            LOG.info("RETURNED SPLITS: split -> " + s); +        }          return splits.toArray(new HBaseTableSplit[splits.size()]);        }         case GET_LIST:        { -        if( keyList == null || keyList.size() == 0 ) { +//        if( keyList == null || keyList.size() == 0 ) { +        if( keyList == null ) {            throw new IOException("Source Mode is GET_LIST but key list is EMPTY");          }  +        if( useSalt ) { +          TreeSet<String> tempKeyList = new TreeSet<String>(); +           +          for(String key: keyList) { +            tempKeyList.add(HBaseSalter.addSaltPrefix(key)); +          } +           +          keyList = tempKeyList; +        } +         +        LOG.debug("".format("Splitting Key List (%s)", keyList)); +                  List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();          for (int i = 0; i < keys.getFirst().length; i++) { @@ -280,44 +294,46 @@ public class HBaseInputFormat              continue;            } -          LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); +          LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));            Set<String> regionsSubSet = null;            if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { -            LOG.info("REGION start is empty"); -            LOG.info("REGION stop is empty"); +            LOG.debug("REGION start is empty"); +            LOG.debug("REGION stop is empty");              regionsSubSet = keyList;            } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { -            LOG.info("REGION start is empty"); +            LOG.debug("REGION start is empty");              regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);            } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { -            LOG.info("REGION stop is empty"); +            LOG.debug("REGION stop is empty");              regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);            } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {              regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);            } else { -            throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",  +            throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",                   regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));            }            if( regionsSubSet == null || regionsSubSet.size() == 0) { -            LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); +            LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null");              continue;            }            TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); -          LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); +          LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));            HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), regionKeyList, +              table.getTableName(), regionKeyList, versions,                regions[i],  -              SourceMode.GET_LIST); +              SourceMode.GET_LIST, useSalt);            splits.add(split);          } +        LOG.debug("RETURNED SPLITS: split -> " + splits); +          return splits.toArray(new HBaseTableSplit[splits.size()]);        }  @@ -351,20 +367,23 @@ public class HBaseInputFormat        case SCAN_ALL:        case SCAN_RANGE:        { -        LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); +        LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));          trr.setStartRow(tSplit.getStartRow());          trr.setEndRow(tSplit.getEndRow());          trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +        trr.setUseSalt(useSalt);        }        break;        case GET_LIST:        { -        LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); +        LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));          trr.setKeyList(tSplit.getKeyList()); +        trr.setVersions(tSplit.getVersions()); +        trr.setUseSalt(useSalt);        }        break; @@ -402,6 +421,9 @@ public class HBaseInputFormat    private SourceMode sourceMode = SourceMode.EMPTY;    private TreeSet<String> keyList = null; +  private int versions = 1; +  private boolean useSalt = false; +  private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;    public void configure(JobConf job) {      String tableName = getTableName(job); @@ -422,9 +444,12 @@ public class HBaseInputFormat      LOG.debug("Entered : " + this.getClass() + " : configure()" ); +    useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); +    prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); +          sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; -    LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",  +    LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",           String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));      switch( sourceMode ) { @@ -442,16 +467,18 @@ public class HBaseInputFormat          Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));          keyList = new TreeSet<String> (keys); + +        versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); -        LOG.info( "GOT KEY LIST : " + keys ); -        LOG.info(String.format("SETTING key list (%s)", keyList) ); +        LOG.debug( "GOT KEY LIST : " + keys ); +        LOG.debug(String.format("SETTING key list (%s)", keyList) );          break;        case EMPTY:          LOG.info("HIT EMPTY"); -        sourceMode = sourceMode.SCAN_ALL; +        sourceMode = SourceMode.SCAN_ALL;          break;        default: @@ -468,7 +495,7 @@ public class HBaseInputFormat      if (tableName == null) {        throw new IOException("expecting one table name");      } -    LOG.debug("".format("Found Table name [%s]", tableName)); +    LOG.debug(String.format("Found Table name [%s]", tableName));      // connected to table? @@ -476,16 +503,16 @@ public class HBaseInputFormat        throw new IOException("could not connect to table '" +          tableName + "'");      } -    LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); +    LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));      // expecting at least one column      String colArg = job.get(COLUMN_LIST);      if (colArg == null || colArg.length() == 0) {        throw new IOException("expecting at least one column");      } -    LOG.debug("".format("Found Columns [%s]", colArg)); +    LOG.debug(String.format("Found Columns [%s]", colArg)); -    LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); +    LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey));      if( sourceMode == SourceMode.EMPTY ) {        throw new IOException("SourceMode should not be EMPTY"); @@ -504,7 +531,7 @@ public class HBaseInputFormat    private void setJobProp( JobConf job, String key, String value) { -    if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); +    if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));      job.set(key,  value);    } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java new file mode 100644 index 0000000..40f1faf --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -0,0 +1,59 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +public class HBaseOutputFormat extends +FileOutputFormat<ImmutableBytesWritable, Put> { + +  /** JobConf parameter that specifies the output table */ +  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; +  private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); + + +  @Override +  @SuppressWarnings("unchecked") +  public RecordWriter getRecordWriter(FileSystem ignored, +      JobConf job, String name, Progressable progress) throws IOException { + +    // expecting exactly one path + +    String tableName = job.get(OUTPUT_TABLE); +    HTable table = null; +    try { +      table = new HTable(HBaseConfiguration.create(job), tableName); +    } catch(IOException e) { +      LOG.error(e); +      throw e; +    } +    table.setAutoFlush(false); +    return new HBaseRecordWriter(table); +  } + +  @Override +  public void checkOutputSpecs(FileSystem ignored, JobConf job) +  throws FileAlreadyExistsException, InvalidJobConfException, IOException { + +    String tableName = job.get(OUTPUT_TABLE); +    if(tableName == null) { +      throw new IOException("Must specify table name"); +    } +  } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java index 97077c4..d22ed71 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -3,12 +3,17 @@ package parallelai.spyglass.hbase;  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;  import java.io.IOException; -import java.util.Set; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map;  import java.util.TreeSet; +import java.util.Vector;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue;  import org.apache.hadoop.hbase.client.Get;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result; @@ -18,36 +23,38 @@ import org.apache.hadoop.hbase.client.ScannerCallable;  import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes;   import org.apache.hadoop.hbase.util.Writables;  import org.apache.hadoop.mapred.RecordReader;  import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { +public class HBaseRecordReader implements +    RecordReader<ImmutableBytesWritable, Result> {    static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); -  private byte [] startRow; -  private byte [] endRow; -  private byte [] lastSuccessfulRow; +  private byte[] startRow; +  private byte[] endRow; +  private byte[] lastSuccessfulRow;    private TreeSet<String> keyList;    private SourceMode sourceMode;    private Filter trrRowFilter;    private ResultScanner scanner;    private HTable htable; -  private byte [][] trrInputColumns; +  private byte[][] trrInputColumns;    private long timestamp;    private int rowcount;    private boolean logScannerActivity = false;    private int logPerRowCount = 100;    private boolean endRowInclusive = true; +  private int versions = 1; +  private boolean useSalt = false;    /**     * Restart from survivable exceptions by creating a new scanner. -   * +   *      * @param firstRow     * @throws IOException     */ @@ -55,26 +62,26 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R      Scan currentScan;      if ((endRow != null) && (endRow.length > 0)) {        if (trrRowFilter != null) { -        Scan scan = new Scan(firstRow, (endRowInclusive ?   -            Bytes.add(endRow, new byte[] {0}) : endRow ) ); -         +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +            new byte[] { 0 }) : endRow)); +          TableInputFormat.addColumns(scan, trrInputColumns);          scan.setFilter(trrRowFilter);          scan.setCacheBlocks(false);          this.scanner = this.htable.getScanner(scan);          currentScan = scan;        } else { -        LOG.debug("TIFB.restart, firstRow: " + -            Bytes.toString(firstRow) + ", endRow: " + -            Bytes.toString(endRow)); -        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); +        LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) +            + ", endRow: " + Bytes.toString(endRow)); +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +            new byte[] { 0 }) : endRow));          TableInputFormat.addColumns(scan, trrInputColumns);          this.scanner = this.htable.getScanner(scan);          currentScan = scan;        }      } else { -      LOG.debug("TIFB.restart, firstRow: " + -          Bytes.toStringBinary(firstRow) + ", no endRow"); +      LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) +          + ", no endRow");        Scan scan = new Scan(firstRow);        TableInputFormat.addColumns(scan, trrInputColumns); @@ -83,7 +90,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R        currentScan = scan;      }      if (logScannerActivity) { -      LOG.info("Current scan=" + currentScan.toString()); +      LOG.debug("Current scan=" + currentScan.toString());        timestamp = System.currentTimeMillis();        rowcount = 0;      } @@ -97,6 +104,14 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R      this.keyList = keyList;    } +  public void setVersions(int versions) { +    this.versions = versions; +  } +   +  public void setUseSalt(boolean useSalt) { +    this.useSalt = useSalt; +  } +    public SourceMode getSourceMode() {      return sourceMode;    } @@ -108,76 +123,84 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    public byte[] getEndRow() {      return endRow;    } -   +    public void setEndRowInclusive(boolean isInclusive) {      endRowInclusive = isInclusive;    } -   +    public boolean getEndRowInclusive() {      return endRowInclusive;    } -   -  private byte [] nextKey = null; + +  private byte[] nextKey = null; +  private Vector<List<KeyValue>> resultVector = null; +  Map<Long, List<KeyValue>> keyValueMap = null;    /**     * Build the scanner. Not done in constructor to allow for extension. -   * +   *      * @throws IOException     */    public void init() throws IOException { -    switch( sourceMode ) { -      case SCAN_ALL: -      case SCAN_RANGE: -        restartRangeScan(startRow); -        break; -         -      case GET_LIST: -        nextKey = Bytes.toBytes(keyList.pollFirst()); -        break; -         -      default: -        throw new IOException(" Unknown source mode : " + sourceMode ); +    switch (sourceMode) { +    case SCAN_ALL: +    case SCAN_RANGE: +      restartRangeScan(startRow); +      break; + +    case GET_LIST: +      nextKey = Bytes.toBytes(keyList.pollFirst()); +      break; + +    default: +      throw new IOException(" Unknown source mode : " + sourceMode);      }    }    byte[] getStartRow() {      return this.startRow;    } +    /** -   * @param htable the {@link HTable} to scan. +   * @param htable +   *          the {@link HTable} to scan.     */    public void setHTable(HTable htable) {      Configuration conf = htable.getConfiguration(); -    logScannerActivity = conf.getBoolean( -      ScannerCallable.LOG_SCANNER_ACTIVITY, false); +    logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, +        false);      logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);      this.htable = htable;    }    /** -   * @param inputColumns the columns to be placed in {@link Result}. +   * @param inputColumns +   *          the columns to be placed in {@link Result}.     */ -  public void setInputColumns(final byte [][] inputColumns) { +  public void setInputColumns(final byte[][] inputColumns) {      this.trrInputColumns = inputColumns;    }    /** -   * @param startRow the first row in the split +   * @param startRow +   *          the first row in the split     */ -  public void setStartRow(final byte [] startRow) { +  public void setStartRow(final byte[] startRow) {      this.startRow = startRow;    }    /** -   * -   * @param endRow the last row in the split +   *  +   * @param endRow +   *          the last row in the split     */ -  public void setEndRow(final byte [] endRow) { +  public void setEndRow(final byte[] endRow) {      this.endRow = endRow;    }    /** -   * @param rowFilter the {@link Filter} to be used. +   * @param rowFilter +   *          the {@link Filter} to be used.     */    public void setRowFilter(Filter rowFilter) {      this.trrRowFilter = rowFilter; @@ -185,12 +208,13 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    @Override    public void close() { -	  if (this.scanner != null) this.scanner.close(); +    if (this.scanner != null) +      this.scanner.close();    }    /**     * @return ImmutableBytesWritable -   * +   *      * @see org.apache.hadoop.mapred.RecordReader#createKey()     */    @Override @@ -200,7 +224,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    /**     * @return RowResult -   * +   *      * @see org.apache.hadoop.mapred.RecordReader#createValue()     */    @Override @@ -222,104 +246,260 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R    }    /** -   * @param key HStoreKey as input key. -   * @param value MapWritable as input value +   * @param key +   *          HStoreKey as input key. +   * @param value +   *          MapWritable as input value     * @return true if there was more data     * @throws IOException     */    @Override    public boolean next(ImmutableBytesWritable key, Result value) -  throws IOException { -     -    switch(sourceMode) { -      case SCAN_ALL: -      case SCAN_RANGE: -      { -         -        Result result; +      throws IOException { + +    switch (sourceMode) { +    case SCAN_ALL: +    case SCAN_RANGE: { + +      Result result; +      try {          try { -          try { -            result = this.scanner.next(); -            if (logScannerActivity) { -              rowcount ++; -              if (rowcount >= logPerRowCount) { -                long now = System.currentTimeMillis(); -                LOG.info("Mapper took " + (now-timestamp) -                  + "ms to process " + rowcount + " rows"); -                timestamp = now; -                rowcount = 0; -              } -            } -          } catch (IOException e) { -            // try to handle all IOExceptions by restarting -            // the scanner, if the second call fails, it will be rethrown -            LOG.debug("recovered from " + StringUtils.stringifyException(e)); -            if (lastSuccessfulRow == null) { -              LOG.warn("We are restarting the first next() invocation," + -                  " if your mapper has restarted a few other times like this" + -                  " then you should consider killing this job and investigate" + -                  " why it's taking so long."); -            } -            if (lastSuccessfulRow == null) { -              restartRangeScan(startRow); -            } else { -              restartRangeScan(lastSuccessfulRow); -              this.scanner.next();    // skip presumed already mapped row +          result = this.scanner.next(); +          if (logScannerActivity) { +            rowcount++; +            if (rowcount >= logPerRowCount) { +              long now = System.currentTimeMillis(); +              LOG.debug("Mapper took " + (now - timestamp) + "ms to process " +                  + rowcount + " rows"); +              timestamp = now; +              rowcount = 0;              } -            result = this.scanner.next();            } +        } catch (IOException e) { +          // try to handle all IOExceptions by restarting +          // the scanner, if the second call fails, it will be rethrown +          LOG.debug("recovered from " + StringUtils.stringifyException(e)); +          if (lastSuccessfulRow == null) { +            LOG.warn("We are restarting the first next() invocation," +                + " if your mapper has restarted a few other times like this" +                + " then you should consider killing this job and investigate" +                + " why it's taking so long."); +          } +          if (lastSuccessfulRow == null) { +            restartRangeScan(startRow); +          } else { +            restartRangeScan(lastSuccessfulRow); +            this.scanner.next(); // skip presumed already mapped row +          } +          result = this.scanner.next(); +        } -          if (result != null && result.size() > 0) { +        if (result != null && result.size() > 0) { +          if( useSalt) { +            key.set( HBaseSalter.delSaltPrefix(result.getRow())); +          } else {              key.set(result.getRow()); -            lastSuccessfulRow = key.get(); -            Writables.copyWritable(result, value); -            return true; -          } -          return false; -        } catch (IOException ioe) { -          if (logScannerActivity) { -            long now = System.currentTimeMillis(); -            LOG.info("Mapper took " + (now-timestamp) -              + "ms to process " + rowcount + " rows"); -            LOG.info(ioe); -            String lastRow = lastSuccessfulRow == null ? -              "null" : Bytes.toStringBinary(lastSuccessfulRow); -            LOG.info("lastSuccessfulRow=" + lastRow);            } -          throw ioe; +           +          lastSuccessfulRow = key.get(); +          Writables.copyWritable(result, value); +          return true; +        } +        return false; +      } catch (IOException ioe) { +        if (logScannerActivity) { +          long now = System.currentTimeMillis(); +          LOG.debug("Mapper took " + (now - timestamp) + "ms to process " +              + rowcount + " rows"); +          LOG.debug(ioe); +          String lastRow = lastSuccessfulRow == null ? "null" : Bytes +              .toStringBinary(lastSuccessfulRow); +          LOG.debug("lastSuccessfulRow=" + lastRow);          } +        throw ioe;        } +    } + +    case GET_LIST: { +      LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); -      case GET_LIST: -      { -        Result result; -        if( nextKey != null ) { -          result = this.htable.get(new Get(nextKey)); +      if (versions == 1) { +        if (nextKey != null) { +          LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); -          if (result != null && result.size() > 0) { -        	System.out.println("KeyList => " + keyList); -        	System.out.println("Result => " + result); -        	if (keyList != null || !keyList.isEmpty()) { -        		 -        		String newKey = keyList.pollFirst(); -        		System.out.println("New Key => " + newKey); -        		nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); -        	} else { -        		nextKey = null; -        	} -            key.set(result.getRow()); +          Get theGet = new Get(nextKey); +          theGet.setMaxVersions(versions); + +          Result result = this.htable.get(theGet); + +          if (result != null && (! result.isEmpty()) ) { +            LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + +            if (keyList != null || !keyList.isEmpty()) { +              String newKey = keyList.pollFirst(); +              LOG.debug("New Key => " + newKey); +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +            } else { +              nextKey = null; +            } +             +            LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); +             +            // Write the result +            if( useSalt) { +              key.set( HBaseSalter.delSaltPrefix(result.getRow())); +            } else { +              key.set(result.getRow()); +            }              lastSuccessfulRow = key.get();              Writables.copyWritable(result, value); +                          return true; +          } else { +            LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 +             +            String newKey; +            while((newKey = keyList.pollFirst()) != null) { +              LOG.debug("WHILE NEXT Key => " + newKey); + +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +               +              if( nextKey == null ) { +                LOG.error("BOMB! BOMB! BOMB!"); +                continue;  +              } +               +              if( ! this.htable.exists( new Get(nextKey) ) ) { +                LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); +                continue;  +              } else { break; } +            } +             +            nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                .toBytes(newKey); +             +            LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + +            return next(key, value);            } -          return false;          } else { +          // Nothig left. return false            return false;          } +      } else { +        if (resultVector != null && resultVector.size() != 0) {  +          LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + +          List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); +          Result result = new Result(resultKeyValue); + +          LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + +          if( useSalt) { +            key.set( HBaseSalter.delSaltPrefix(result.getRow())); +          } else { +            key.set(result.getRow()); +          } +          lastSuccessfulRow = key.get(); +          Writables.copyWritable(result, value); + +          return true; +        } else { +          if (nextKey != null) { +            LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); +             +            Get theGet = new Get(nextKey); +            theGet.setMaxVersions(versions); + +            Result resultAll = this.htable.get(theGet); +             +            if( resultAll != null && (! resultAll.isEmpty())) { +              List<KeyValue> keyValeList = resultAll.list(); + +              keyValueMap = new HashMap<Long, List<KeyValue>>(); +               +              LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + +              for (KeyValue keyValue : keyValeList) { +                long version = keyValue.getTimestamp(); + +                if (keyValueMap.containsKey(new Long(version))) { +                  List<KeyValue> keyValueTempList = keyValueMap.get(new Long( +                      version)); +                  if (keyValueTempList == null) { +                    keyValueTempList = new ArrayList<KeyValue>(); +                  } +                  keyValueTempList.add(keyValue); +                } else { +                  List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); +                  keyValueMap.put(new Long(version), keyValueTempList); +                  keyValueTempList.add(keyValue); +                } +              } + +              resultVector = new Vector<List<KeyValue>>(); +              resultVector.addAll(keyValueMap.values()); + +              List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); + +              Result result = new Result(resultKeyValue); + +              LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + +              String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + +              System.out.println("+ New Key => " + newKey); +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); + +              if( useSalt) { +                key.set( HBaseSalter.delSaltPrefix(result.getRow())); +              } else { +                key.set(result.getRow()); +              } +              lastSuccessfulRow = key.get(); +              Writables.copyWritable(result, value); +              return true; +            } else { +              LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 +               +              String newKey; +               +              while( (newKey = keyList.pollFirst()) != null ) { +                LOG.debug("+ WHILE NEXT Key => " + newKey); + +                nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                    .toBytes(newKey); +                 +                if( nextKey == null ) { +                  LOG.error("+ BOMB! BOMB! BOMB!"); +                  continue;  +                } +                 +                if( ! this.htable.exists( new Get(nextKey) ) ) { +                  LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); +                  continue;  +                } else { break; } +              } +               +              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes +                  .toBytes(newKey); +               +              LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + +              return next(key, value); +            } +             +          } else { +            return false; +          } +        }        } -       -      default: -        throw new IOException("Unknown source mode : " + sourceMode ); -    }  +    } +    default: +      throw new IOException("Unknown source mode : " + sourceMode); +    }    }  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java new file mode 100644 index 0000000..1cca133 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -0,0 +1,37 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; + +/** + * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) + * and write to an HBase table + */ +public class HBaseRecordWriter +  implements RecordWriter<ImmutableBytesWritable, Put> { +  private HTable m_table; + +  /** +   * Instantiate a TableRecordWriter with the HBase HClient for writing. +   * +   * @param table +   */ +  public HBaseRecordWriter(HTable table) { +    m_table = table; +  } + +  public void close(Reporter reporter) +    throws IOException { +    m_table.close(); +  } + +  public void write(ImmutableBytesWritable key, +      Put value) throws IOException { +    m_table.put(new Put(value)); +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java new file mode 100644 index 0000000..fc656a4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -0,0 +1,206 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +public class HBaseSalter { +  private static final Log LOG = LogFactory.getLog(HBaseSalter.class); + +   +  public static byte [] addSaltPrefix(byte [] key) throws IOException { +    if( key == null || key.length < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long"); +     +    String keyStr = Bytes.toString(key); +     +    return Bytes.toBytes(keyStr.substring(keyStr.length() - 1) + "_" + keyStr);  +  } + +  public static String addSaltPrefix(String key) throws IOException { +    if( key == null || key.length() < 1 ) throw new IOException("Input Key is EMPTY or Less than 1 Character Long"); +     +    return (key.substring(key.length() - 1) + "_" + key);  +  } +   +  public static ImmutableBytesWritable addSaltPrefix( ImmutableBytesWritable key ) throws IOException { +	  return new ImmutableBytesWritable( addSaltPrefix(key.get())); +  } + +  public static byte [] delSaltPrefix(byte [] key) throws IOException { +    if( key == null || key.length < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long"); +     +    String keyStr = Bytes.toString(key); +     +    return Bytes.toBytes(keyStr.substring(2));  +  } + +  public static String delSaltPrefix(String key) throws IOException { +    if( key == null || key.length() < 3 ) throw new IOException("Input Key is EMPTY or Less than 3 Characters Long"); +     +    return key.substring(2);  +  } +   +  public static ImmutableBytesWritable delSaltPrefix( ImmutableBytesWritable key ) throws IOException { +	  return new ImmutableBytesWritable( delSaltPrefix(key.get())); +  } +   +  public static final String DEFAULT_PREFIX_LIST = " !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~"; +   +  public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey) throws IOException { +	  return getDistributedIntervals(originalStartKey, originalStopKey, DEFAULT_PREFIX_LIST); +  } +   +  public static Pair<byte[], byte[]>[] getDistributedIntervals(byte[] originalStartKey, byte[] originalStopKey, String prefixList) throws IOException { +	  return getDistributedIntervals(originalStartKey, originalStopKey, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, prefixList); +  } +   + +  public static Pair<byte[], byte[]>[] getDistributedIntervals( +		  byte[] originalStartKey, byte[] originalStopKey,  +		  byte[] regionStartKey, byte[] regionStopKey,  +		  String prefixList) throws IOException { +	LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", +			Bytes.toString(originalStartKey), +			Bytes.toString(originalStopKey), +			Bytes.toString(regionStartKey), +			Bytes.toString(regionStopKey), +			prefixList +			));  +	  +    byte[][] startKeys; +    byte[][] stopKeys; +     +    if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW) +    		&& Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW) ) { +    	startKeys = getAllKeys(originalStartKey, prefixList); +    	stopKeys = getAllKeys(originalStopKey, prefixList); +    } else if(Arrays.equals(regionStartKey, HConstants.EMPTY_START_ROW)) { +    	startKeys = getAllKeysWithStop(originalStartKey, prefixList, regionStopKey[0]); +    	stopKeys = getAllKeysWithStop(originalStopKey, prefixList, regionStopKey[0]); +    } else if(Arrays.equals(regionStopKey, HConstants.EMPTY_END_ROW)) { +    	startKeys = getAllKeysWithStart(originalStartKey, prefixList, regionStartKey[0]); +    	stopKeys = getAllKeysWithStart(originalStopKey, prefixList, regionStartKey[0]); +    } else { +    	startKeys = getAllKeysInRange(originalStartKey, prefixList, regionStartKey[0], regionStopKey[0]); +    	stopKeys = getAllKeysInRange(originalStopKey, prefixList, regionStartKey[0], regionStopKey[0]); +    } +     +    if( startKeys.length != stopKeys.length) { +    	throw new IOException("LENGTH of START Keys and STOP Keys DO NOT match"); +    } +     +    if( Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)  +    		&& Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW) ) { +        Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR); +        // stop keys are the start key of the next interval +        for (int i = startKeys.length - 1; i >= 1; i--) { +        	startKeys[i] = startKeys[i - 1]; +        } +        startKeys[0] = HConstants.EMPTY_START_ROW; +        stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW; +    } else if (Arrays.equals(originalStartKey, HConstants.EMPTY_START_ROW)) { +        Arrays.sort(stopKeys, Bytes.BYTES_RAWCOMPARATOR); +        // stop keys are the start key of the next interval +        for (int i = startKeys.length - 1; i >= 1; i--) { +        	startKeys[i] = startKeys[i - 1]; +        } +        startKeys[0] = HConstants.EMPTY_START_ROW; +    } else if (Arrays.equals(originalStopKey, HConstants.EMPTY_END_ROW)) { +        Arrays.sort(startKeys, Bytes.BYTES_RAWCOMPARATOR); +        // stop keys are the start key of the next interval +        for (int i = 0; i < stopKeys.length - 1; i++) { +          stopKeys[i] = stopKeys[i + 1]; +        } +        stopKeys[stopKeys.length - 1] = HConstants.EMPTY_END_ROW; +    }  +       +    Pair<byte[], byte[]>[] intervals = new Pair[startKeys.length]; +    for (int i = 0; i < startKeys.length; i++) { +      intervals[i] = new Pair<byte[], byte[]>(startKeys[i], stopKeys[i]); +    } + +    return intervals; +  }   +   +  public static byte[][] getAllKeys(byte[] originalKey) throws IOException { +    return getAllKeys(originalKey, DEFAULT_PREFIX_LIST); +  } + +  public static byte[][] getAllKeys(byte[] originalKey, String prefixList) throws IOException { +	  char[] prefixArray = prefixList.toCharArray(); +	   +	  return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)prefixArray[prefixArray.length - 1]); +  } + +  public static byte[][] getAllKeysWithStart(byte[] originalKey, String prefixList, byte startKey) throws IOException { +	  char[] prefixArray = prefixList.toCharArray(); +	   +	  return getAllKeysWithStartStop(originalKey, prefixList, startKey, (byte)prefixArray[prefixArray.length - 1]); +  } + +  public static byte[][] getAllKeysWithStop(byte[] originalKey, String prefixList, byte stopKey) throws IOException { +	  char[] prefixArray = prefixList.toCharArray(); +	   +	  return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); +  } + +  public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { +	  return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); +  } +   +  private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { +	  char[] prefixArray = prefixList.toCharArray(); +	  TreeSet<Byte> prefixSet = new TreeSet<Byte>(); +	   +	  for( char c : prefixArray ) { +		  prefixSet.add((byte)c); +	  } +	   +	  SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); +	   +	  return getAllKeys(originalKey, subSet.toArray(new Byte[]{})); +  } +   +  public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { +	byte[][] keys = new byte[prefixArray.length][]; +	 +    for (byte i = 0; i < prefixArray.length; i++) { +      keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey)); +    } + +    return keys; +  } +   +  public static void main(String [] args) throws IOException { +	   +   String s = ""; +    +   for (byte i = 32; i < Byte.MAX_VALUE; i++) { +	 s += (char)i; +   } +    +   System.out.println("".format("[%s]", s)); +   System.out.println("".format("[%s]", DEFAULT_PREFIX_LIST)); + +	   +    String start = Bytes.toString(HConstants.EMPTY_START_ROW); +    String stop = Bytes.toString(HConstants.EMPTY_END_ROW); +     +    System.out.println("".format("START: (%s) STOP: (%s)", start, stop)); +     +    Pair<byte[], byte[]>[] intervals = getDistributedIntervals(Bytes.toBytes(start), Bytes.toBytes(stop), "1423"); +     +    for( Pair<byte[], byte[]> p : intervals ) { +      System.out.println("".format("iSTART: (%s) iSTOP: (%s)", Bytes.toString(p.getFirst()), Bytes.toString(p.getSecond()))); +    } +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index e5acc30..a7d36fd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -24,7 +24,6 @@ import cascading.util.Util;  import org.apache.hadoop.hbase.client.Put;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.OutputCollector; @@ -63,7 +62,9 @@ public class HBaseScheme    /** String columns */    private transient String[] columns;    /** Field fields */ -  private transient byte[][] fields; +//  private transient byte[][] fields; +   +  private boolean useSalt = false;    /** @@ -237,8 +238,13 @@ public class HBaseScheme      OutputCollector outputCollector = sinkCall.getOutput();      Tuple key = tupleEntry.selectTuple(keyField);      ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); +     +    if( useSalt ) { +    	keyBytes = HBaseSalter.addSaltPrefix(keyBytes); +    } +          Put put = new Put(keyBytes.get(), this.timeStamp); - +          for (int i = 0; i < valueFields.length; i++) {        Fields fieldSelector = valueFields[i];        TupleEntry values = tupleEntry.selectEntry(fieldSelector); @@ -258,10 +264,13 @@ public class HBaseScheme    @Override    public void sinkConfInit(FlowProcess<JobConf> process,        Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -    conf.setOutputFormat(TableOutputFormat.class); +    conf.setOutputFormat(HBaseOutputFormat.class);      conf.setOutputKeyClass(ImmutableBytesWritable.class);      conf.setOutputValueClass(Put.class); +     +    String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE); +    useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false);    }    @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java index 1d48e1d..a5c3bdd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -1,11 +1,9 @@  package parallelai.spyglass.hbase; -import java.awt.event.KeyListener;  import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException;  import java.io.Serializable; -import java.util.Set;  import java.util.TreeSet;  import org.apache.commons.logging.Log; @@ -14,8 +12,9 @@ import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.InputSplit; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import com.sun.tools.javac.resources.version; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { @@ -28,11 +27,13 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,    private TreeSet<String> m_keyList = null;    private SourceMode m_sourceMode = SourceMode.EMPTY;    private boolean m_endRowInclusive = true; +  private int m_versions = 1; +  private boolean m_useSalt = false;    /** default constructor */    public HBaseTableSplit() {      this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, -      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); +      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);    }    /** @@ -43,19 +44,22 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,     * @param location     */    public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, -      final String location, final SourceMode sourceMode) { +      final String location, final SourceMode sourceMode, final boolean useSalt) {      this.m_tableName = tableName;      this.m_startRow = startRow;      this.m_endRow = endRow;      this.m_regionLocation = location;      this.m_sourceMode = sourceMode; +    this.m_useSalt = useSalt;    } -  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { +  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) {      this.m_tableName = tableName;      this.m_keyList = keyList; +    this.m_versions = versions;      this.m_sourceMode = sourceMode;      this.m_regionLocation = location; +    this.m_useSalt = useSalt;    }    /** @return table name */ @@ -86,20 +90,28 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,      return m_keyList;    } +  public int getVersions() { +	  return m_versions; +  } +    /** @return get the source mode */    public SourceMode getSourceMode() {      return m_sourceMode;    } +   +  public boolean getUseSalt() { +    return m_useSalt; +  }    /** @return the region's hostname */    public String getRegionLocation() { -    LOG.info("REGION GETTER : " + m_regionLocation); +    LOG.debug("REGION GETTER : " + m_regionLocation);      return this.m_regionLocation;    }    public String[] getLocations() { -    LOG.info("REGION ARRAY : " + m_regionLocation); +    LOG.debug("REGION ARRAY : " + m_regionLocation);      return new String[] {this.m_regionLocation};    } @@ -112,11 +124,12 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,    @Override    public void readFields(DataInput in) throws IOException { -    LOG.info("READ ME : " + in.toString()); +    LOG.debug("READ ME : " + in.toString());      this.m_tableName = Bytes.readByteArray(in);      this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));      this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); +    this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));      switch(this.m_sourceMode) {        case SCAN_RANGE: @@ -126,6 +139,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;        case GET_LIST: +    	this.m_versions = Bytes.toInt(Bytes.readByteArray(in));          this.m_keyList = new TreeSet<String>();          int m = Bytes.toInt(Bytes.readByteArray(in)); @@ -136,16 +150,17 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;      } -    LOG.info("READ and CREATED : " + this); +    LOG.debug("READ and CREATED : " + this);    }    @Override    public void write(DataOutput out) throws IOException { -    LOG.info("WRITE : " + this); +    LOG.debug("WRITE : " + this);      Bytes.writeByteArray(out, this.m_tableName);      Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));      Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +    Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));      switch( this.m_sourceMode ) {        case SCAN_RANGE: @@ -155,6 +170,7 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;        case GET_LIST: +    	Bytes.writeByteArray(out, Bytes.toBytes(m_versions));          Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));          for( String k: this.m_keyList ) { @@ -163,13 +179,14 @@ public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>,          break;      } -    LOG.info("WROTE : " + out.toString()); +    LOG.debug("WROTE : " + out.toString());    }    @Override    public String toString() { -    return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)",  -        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); +    return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",  +        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),  +        (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt);    }    @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 9a0ed0e..65873a0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,7 @@  package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  import cascading.flow.FlowProcess;  import cascading.tap.SinkMode;  import cascading.tap.Tap; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;  import org.apache.hadoop.hbase.MasterNotRunningException;  import org.apache.hadoop.hbase.ZooKeeperConnectionException;  import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  import org.apache.hadoop.mapred.FileInputFormat;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.OutputCollector; @@ -35,13 +35,10 @@ import org.apache.hadoop.mapred.RecordReader;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; -  import java.io.IOException;  import java.io.Serializable;  import java.util.ArrayList; -import java.util.Map.Entry; +import java.util.Arrays;  import java.util.UUID;  /** @@ -172,7 +169,12 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      } -    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +    conf.set(HBaseOutputFormat.OUTPUT_TABLE, tableName); + +    for( SinkConfig sc : sinkConfigList) { +        sc.configure(conf); +    } +          super.sinkConfInit(process, conf);    } @@ -292,10 +294,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      public String startKey = null;      public String stopKey = null;      public String [] keyList = null; +    public int versions = 1; +    public boolean useSalt = false; +    public String prefixList = null;      public void configure(Configuration jobConf) {        switch( sourceMode ) {          case SCAN_RANGE: +	        if (stopKey != null && startKey != null && startKey.compareTo(stopKey) > 0) { +	      	  String t = stopKey; +	      	  stopKey = startKey; +	      	  startKey = t; +	        } +                        jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());            if( startKey != null && startKey.length() > 0 ) @@ -304,57 +315,104 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {            if( stopKey != null && stopKey.length() > 0 )              jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); -          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); -          LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); -          LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); +          // Added for Salting +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); +           +          LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info(String.format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); +          LOG.info(String.format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));            break;          case GET_LIST:            jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());            jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); +          jobConf.setInt(String.format(HBaseConstants.VERSIONS, tableName), versions); -          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); -          LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); +          // Added for Salting +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); +           +          LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info(String.format("Setting KEY LIST (%s) to key list length (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList.length));            break;          default:            jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); -          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          // Added for Salting +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); +           +          LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));            break;        }      }    } +  private static class SinkConfig implements Serializable { +	  public String tableName = null; +	  public boolean useSalt = false; +	   +	  public void configure(Configuration jobConf) { +          jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +	  } +  } +      private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>(); - -  public void setHBaseRangeParms(String startKey, String stopKey ) { +  private ArrayList<SinkConfig> sinkConfigList = new ArrayList<SinkConfig>(); +   +  public void setHBaseRangeParms(String startKey, String stopKey, boolean useSalt, String prefixList ) {      SourceConfig sc = new SourceConfig();      sc.sourceMode = SourceMode.SCAN_RANGE;      sc.tableName = tableName;      sc.startKey = startKey;      sc.stopKey = stopKey; +    sc.useSalt = useSalt; +    setPrefixList(sc, prefixList);      sourceConfigList.add(sc);    } -  public void setHBaseListParms(String [] keyList ) { +  public void setHBaseListParms(String [] keyList, int versions, boolean useSalt, String prefixList ) {      SourceConfig sc = new SourceConfig();      sc.sourceMode = SourceMode.GET_LIST;      sc.tableName = tableName;      sc.keyList = keyList; +    sc.versions = (versions < 1) ? 1 : versions; +    sc.useSalt = useSalt; +    setPrefixList(sc, prefixList);      sourceConfigList.add(sc);    } -  public void setHBaseScanAllParms() { +  public void setHBaseScanAllParms(boolean useSalt, String prefixList) {      SourceConfig sc = new SourceConfig();      sc.sourceMode = SourceMode.SCAN_ALL;      sc.tableName = tableName; +    sc.useSalt = useSalt; +     +    setPrefixList(sc, prefixList);      sourceConfigList.add(sc);    } +   +  public void setUseSaltInSink( boolean useSalt ) { +    SinkConfig sc = new SinkConfig(); +     +    sc.tableName = tableName; +    sc.useSalt = useSalt; +	 +    sinkConfigList.add(sc); +  } +   +  private void setPrefixList(SourceConfig sc, String prefixList ) { +	  prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList; +	  char[] prefixArray = prefixList.toCharArray(); +	  Arrays.sort(prefixArray); +	  sc.prefixList = new String( prefixArray ); +  }  } diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala new file mode 100644 index 0000000..21d90e8 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -0,0 +1,54 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import com.twitter.scalding.Dsl._ +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding.RichPipe +import com.twitter.scalding.RichFields +import org.apache.hadoop.hbase.util.Bytes +import cascading.tuple.TupleEntry + +class HBasePipeWrapper (pipe: Pipe) { +   def toBytesWritable(f: Fields): Pipe = { +	  asList(f) +     .foldLeft(pipe){ (p, f) => { +	    p.map(f.toString -> f.toString){ from: String => { +	      new ImmutableBytesWritable(Bytes.toBytes(from)) +	    }} +	  }}  +	} + +//   def toBytesWritable : Pipe = { +//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { +//	    p.map(f.toString -> f.toString){ from: String => { +//	      new ImmutableBytesWritable(Bytes.toBytes(from)) +//	    }} +//	  }}  +//	} + +	def fromBytesWritable(f: Fields): Pipe = { +	  asList(f) +	  .foldLeft(pipe) { (p, fld) => +	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { +	    	Bytes.toString(from.get) +	      } +	    } +	  } +	}	 +	 +//	def fromBytesWritable : Pipe = { +//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => +//	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { +//	    	Bytes.toString(from.get) +//	      } +//	    } +//	  } +//	} +} + +trait HBasePipeConversions { +  implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe)  +} + + diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index e46ef50..39a076e 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -9,6 +9,10 @@ import com.twitter.scalding.Mode  import com.twitter.scalding.Read  import com.twitter.scalding.Source  import com.twitter.scalding.Write + +import parallelai.spyglass.hbase.HBaseScheme; +import parallelai.spyglass.hbase.HBaseTap; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  import cascading.scheme.Scheme  import cascading.tap.SinkMode  import cascading.tap.Tap @@ -17,7 +21,6 @@ import org.apache.hadoop.mapred.RecordReader  import scala.compat.Platform  import org.apache.hadoop.mapred.OutputCollector  import org.apache.hadoop.mapred.JobConf -import parallelai.spyglass.hbase.HBaseConstants.SourceMode  object Conversions {    implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -36,7 +39,10 @@ class HBaseSource(      sourceMode: SourceMode = SourceMode.SCAN_ALL,      startKey: String = null,      stopKey: String = null, -    keyList: List[String] = null +    keyList: List[String] = null, +    versions: Int = 1, +    useSalt: Boolean = false, +    prefixList: String = null    ) extends Source {    override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) @@ -51,19 +57,20 @@ class HBaseSource(        case hdfsMode @ Hdfs(_, _) => readOrWrite match {          case Read => {             val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) -           +                       sourceMode match {              case SourceMode.SCAN_RANGE => { -              hbt.setHBaseRangeParms(startKey, stopKey) +               +              hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)              }              case SourceMode.SCAN_ALL => { -              hbt.setHBaseScanAllParms() +              hbt.setHBaseScanAllParms(useSalt, prefixList)              }              case SourceMode.GET_LIST => { -              if( keyList == null )  +              if( keyList == null )                    throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) -              hbt.setHBaseListParms(keyList.toArray[String]) +              hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)              }              case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))            } @@ -73,6 +80,8 @@ class HBaseSource(          case Write => {            val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) +          hbt.setUseSaltInSink(useSalt); +                      hbt.asInstanceOf[Tap[_,_,_]]          }        } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index 4c86b07..1ce9072 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -27,7 +27,7 @@ class HBaseExample(args: Args) extends JobBase(args) {    val jobConf = getJobConf -  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" +  val quorumNames = args("quorum")    case class HBaseTableStore(        conf: Configuration, diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala new file mode 100644 index 0000000..d24f785 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -0,0 +1,101 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +import com.twitter.scalding.Args +import parallelai.spyglass.hbase.HBaseSource +import com.twitter.scalding.Tsv +import cascading.tuple.Fields +import com.twitter.scalding.TextLine +import org.apache.log4j.Logger +import org.apache.log4j.Level +import parallelai.spyglass.hbase.HBasePipeConversions +import cascading.pipe.Pipe + +class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + +  val isDebug = args.getOrElse("debug", "false").toBoolean +   +  if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) }  +   +  val TABLE_SCHEMA = List('key, 'salted, 'unsalted) +   +  val prefix = "0123456789" +   +//  val hbase01 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_ALL ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanAllNoSalt01")) + +//  val hbase02 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanAllPlusSalt01")) + +//  val hbase03 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangeNoSalt01")) + +//  val hbase04 = CommonFunctors.fromBytesWritable( +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/ScanRangePlusSalt01")) + +//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read +//           +//  val hbase05 = CommonFunctors.fromBytesWritable( +//      hbase05bytes,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListNoSalt01")) +// +//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read +//           +//  val hbase06 = CommonFunctors.fromBytesWritable( +//      hbase06bytes,        +//        TABLE_SCHEMA ) +//  .write(TextLine("saltTesting/GetListPlusSalt01")) + +  val hbase07 =  +      new HBaseSource( "_TEST.SALT.03", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) +  .read +  .fromBytesWritable( TABLE_SCHEMA ) +  .write(TextLine("saltTesting/ScanRangePlusSalt10")) +  .toBytesWritable( TABLE_SCHEMA ) +  .write(new HBaseSource( "_TEST.SALT.04", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          useSalt = true )) + +//  val hbase08 =  +//      new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key,   +//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  +//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) +//  .read +//  .fromBytesWritable('*) +//  .write(TextLine("saltTesting/ScanRangePlusSalt03")) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala new file mode 100644 index 0000000..af7d7d2 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -0,0 +1,18 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobRunner + +object HBaseSaltTesterRunner extends App { +   +//  if( args.length < 2 ) { throw new Exception("Not enough Args")} +   +  val appConfig = "/home/crajah/tmp/application.conf" +  val libPath = "/home/crajah/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + +  JobRunner.main(Array(classOf[HBaseSaltTester].getName,  +      "--hdfs",  +      "--app.conf.path", appConfig,  +      "--job.lib.path", libPath, +      "--debug", "true" +  )) +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala new file mode 100644 index 0000000..69f8b60 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala @@ -0,0 +1,444 @@ +package parallelai.spyglass.hbase.testing + +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.log4j.Logger +import com.twitter.scalding.Args +import com.twitter.scalding.IterableSource +import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.base.JobBase +import cascading.pipe.Pipe +import parallelai.spyglass.hbase.HBasePipeConversions + +/** + * This integration-test expects some HBase table to exist + * with specific data - see GenerateTestingHTables.java + * + * Keep in mind that currently:   + * + No version support exists in Scans + * + GET_LIST is working as a Set - Having a rowkey twice in the GET_LIST - will return in only one GET + *  + * ISSUES: + *  + If Scan_Range is unordered i.e. 9 -> 1 (instead of 1 -> 9) unhandled exception is thrown: + *  Caused by: java.lang.IllegalArgumentException: Invalid range: 9 > 11000000 + *	at org.apache.hadoop.hbase.client.HTable.getRegionsInRange(HTable.java:551) + *   + * @author Antwnis@gmail.com + */ + +// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala +class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeConversions { +   +  // Initiate logger +  private val LOG: Logger = LogManager.getLogger(this.getClass) +   +  // Set to Level.DEBUG if --debug is passed in +  val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean +  if (isDebug) {  +    LOG.setLevel(Level.DEBUG) +    LOG.info("Setting logging to Level.DEBUG") +  } +   +  // Set HBase host +  val hbaseHost = "cldmgr.prod.bigdata.bskyb.com:2181"  + +  // ----------------------------- +  // ----- Tests for TABLE_01 ---- +  // ----------------------------- +  val TABLE_01_SCHEMA = List('key,'column1) +  val tableName1 = "TABLE_01" +  val tableName2 = "TABLE_02" + +  // -------------------- Test 01 -------------------- +  var testName01 = "Scan_Test_01_Huge_Key_Range" +  println("---- Running : " + testName01) +  // Get everything from HBase testing table into a Pipe +  val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") +  		  	.read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 01 +  var list01 = List(("2000-01-01 10:00:10", "1"), +                    ("2000-01-01 10:05:00", "2"), +                    ("2000-01-01 10:10:00", "3")) + +  // -------------------- Test 02 -------------------- +  val testName02 = "Scan_Test_02_Borders_Range" +  println("---- Running : " + testName02) +  // Get everything from HBase testing table into a Pipe +  val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") +  		  	.read +  		  	.fromBytesWritable(TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +  // Calculate expected result for Test 02 +  var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3")) + +  // -------------------- Test 03 -------------------- +  val testName03 = "Scan_Test_03_Inner_Range" +  // Get everything from HBase testing table into a Pipe +  val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") +  		  	.read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +   +  // Calculate expected result for Test 03 +  var list03 = List(("2000-01-01 10:05:00", "2")) + +  // -------------------- Test 04 -------------------- +  val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered" +  // Get everything from HBase testing table into a Pipe +  val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") +  		  	.read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +   +  // -------------------- Test 0 - TODO scan multiple versions .. -------------------- +//  val testName04 = "Scan_Test_04_One_Version" +//  // Get everything from HBase testing table into a Pipe +//  val hbase04 = CommonFunctors.fromBytesWritable( +//  		new HBaseSource( tableName2, hbaseHost, 'key,   +//  		    Array("data"),  +//  		    Array('column1), +//            sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00", +//            versions = 1 ) // If versions is '0' - it is regarded as '1' +//            .read  	     +//  	    , TABLE_01_SCHEMA) +//  .groupAll { group =>  +//  	   group.toList[String]('key -> 'key) +//  	   group.toList[String]('column1 -> 'column1) +//  } +//  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +//	     x._1 + " " + x._2 +//  } +//   +//  // Calculate expected result for Test 04 +//  var list04 = List(("","")) + + +  // -------------------- Test 05 -------------------- +  val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), +            versions = 1 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 04 +  var list05 = List(("2000-01-01 11:00:00", "6")) + +  // -------------------- Test 6 -------------------- +  val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions" +  val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), +            versions = 2 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group => +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 05 +  var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2")) + +  // -------------------- Test 7 -------------------- +  val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), +            versions = 3 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  // Calculate expected result for Test 07 +  var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) +   +  // -------------------- Test 08 -------------------- +  val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), +            versions = 4 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"), +                    ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + +  // -------------------- Test 09 -------------------- +  val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions" +  // Get everything from HBase testing table into a Pipe +  val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), +            versions = 4 ) +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +  var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) + + +  // -------------------- Test 10 -------------------- +  val testName10 = "Get_Test_06_TestWith10000and1rowkeys" +  var bigList1:List[String] = (1 to 10000).toList.map(_.toString)  +  var bigList2:List[String] = (100001 to 200000).toList.map(_.toString) +  var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00")    + +  // Get everything from HBase testing table into a Pipe +  val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.GET_LIST, keyList = bigList, +            versions = 2 ) //  +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +   +  var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), +		  			("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") +                    ) +   +  // -------------------- Test 11 -------------------- +  val testName11 = "Get_Test_07_EmptyList" +  // Get everything from HBase testing table into a Pipe +  val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.GET_LIST, keyList = List(), +            versions = 1 ) //  +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } + +   +  // -------------------- Test 11 -------------------- +  val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions" +  // Get everything from HBase testing table into a Pipe +  val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key,   +  		    Array("data"),  +  		    Array('column1), +  		    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), +            versions = 1 )   +            .read  	     +  	    .fromBytesWritable( +  		TABLE_01_SCHEMA) +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +	     x._1 + " " + x._2 +  } +   +  // --------------------- TEST 13 ----------------------------- +  val testName13 = "Some " +  val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key,   +          Array("data"),  +          Array('column1), +          sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true )   +            .read        +        .fromBytesWritable( +      TABLE_01_SCHEMA) +  .groupAll { group =>  +       group.toList[String]('key -> 'key) +       group.toList[String]('column1 -> 'column1) +  } +  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => +       x._1 + " " + x._2 +  }  + +  var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), +            ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") +                    ) + +   +  // Store results of Scan Test 01 +  ( +    getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ +    getTestResultPipe(getExpectedPipe(list02), hbase02, testName02) ++ +    getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++   +    assertPipeIsEmpty(hbase04, testName04) ++ +    getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++  +    getTestResultPipe(getExpectedPipe(list06), hbase06, testName06) ++  +    getTestResultPipe(getExpectedPipe(list07), hbase07, testName07) ++ +    getTestResultPipe(getExpectedPipe(list08), hbase08, testName08) ++ +    getTestResultPipe(getExpectedPipe(list09), hbase09, testName09) ++ +    getTestResultPipe(getExpectedPipe(list10), hbase10, testName10) ++ +    getTestResultPipe(getExpectedPipe(list13), hbase13, testName13) ++ +    assertPipeIsEmpty(hbase11, testName11) ++ +    assertPipeIsEmpty(hbase12, testName12)  +  ).groupAll { group => +    group.sortBy('testName) +  } +  .write(Tsv("HBaseShouldRead")) +   +   +  /** +   * We assume the pipe is empty +   *  +   * We concatenate with a header - if the resulting size is 1  +   * then the original size was 0 - then the pipe was empty :) +   *  +   * The result is then returned in a Pipe +   */ +  def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = {  +    val headerPipe = IterableSource(List(testName), 'hbasedata) +    val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => +      group.size('size) +    } +    .project('size) +     +    val result =  +      concatenation +      .mapTo('size -> ('testName, 'result, 'expecteddata, 'hbasedata)) { x:String => {  +	      if (x == "1") {  +	        (testName, "Success", "", "") +	      } else { +	        (testName, "Test Failed", "", "") +	      }   +    	} +      } +     +    result +  } +   +  /** +   * Methods receives 2 pipes - and projects the results of testing +   *  +   * expectedPipe  should have a column 'expecteddata +   * realHBasePipe should have a column 'hbasedata +   */ +  def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { +	val results = expectedPipe.insert('testName , testName) +                 .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) +    .map(('expecteddata, 'hbasedata)->'result) { x:(String,String) => +        if (x._1.equals(x._2))  +           "Success" +        else +           "Test Failed" +    } +    .project('testName, 'result, 'expecteddata, 'hbasedata) +    results +  } + +  /**  +   *   +   */ +  def getExpectedPipe ( expectedList: List[(String,String)]) : Pipe = { +     +    val expectedPipe =  +      IterableSource(expectedList, TABLE_01_SCHEMA) +      .groupAll { group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +      } +      .mapTo(('*) -> 'expecteddata) { x:(String,String) => +         x._1 + " " + x._2 +      } +   expectedPipe +  } +   +} diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala new file mode 100644 index 0000000..aa77caa --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala @@ -0,0 +1,10 @@ +package parallelai.spyglass.hbase.testing + +import parallelai.spyglass.base.JobRunner + +object HBaseSourceShouldReadRunner extends App { +  val appConfig = "/projects/applications.conf" +  val libPath = "/media/sf__CHANDAN_RAJAH_/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + +  JobRunner.main(Array(classOf[HBaseSourceShouldRead].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath)) +}
\ No newline at end of file diff --git a/src/test/java/parallelai/spyglass/hbase/AllTests.java b/src/test/java/parallelai/spyglass/hbase/AllTests.java new file mode 100644 index 0000000..e1b875f --- /dev/null +++ b/src/test/java/parallelai/spyglass/hbase/AllTests.java @@ -0,0 +1,11 @@ +package parallelai.spyglass.hbase; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; + +@RunWith(Suite.class) +@SuiteClasses({ HBaseSalterTester.class }) +public class AllTests { + +} diff --git a/src/test/java/parallelai/spyglass/hbase/GenerateTestingHTables.java b/src/test/java/parallelai/spyglass/hbase/GenerateTestingHTables.java new file mode 100644 index 0000000..471e1fe --- /dev/null +++ b/src/test/java/parallelai/spyglass/hbase/GenerateTestingHTables.java @@ -0,0 +1,261 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import junit.framework.Assert; +import org.junit.Test; + +/** + * Class generates TWO tables in HBase 'TABLE_01' and 'TABLE_02' + *  + * Those tables are used by the 'integration-testing' of HBaseSource + * in file HBaseSourceShouldRead.scala + *  + * Run with: + * mvn -Dtest=bskyb.commons.hbase.skybase.GenerateTestingHTables test + *  + * @author Antwnis@gmail.com + */ +public class GenerateTestingHTables { + +	private static Configuration config = HBaseConfiguration.create(); +	private static final String QUORUM = "cldmgr.prod.bigdata.bskyb.com"; +	private static final String QUORUM_PORT = "2181"; +	private static final Long STARTING_TIMESTAMP = 1260000000000L; + +	public static enum TestingTable { +		TABLE_01, TABLE_02 +	} + +	private static final Log LOG = LogFactory.getLog(GenerateTestingHTables.class); + +	@Test +	public void fakeTest() { + +		// Connect to Quorum +		LOG.info("Connecting to " + QUORUM + ":" + QUORUM_PORT); +		config.clear(); +		config.set("hbase.zookeeper.quorum", QUORUM); +		config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT); + +		// Delete test tables +		try { +			deleteTestTable(TestingTable.TABLE_01.name()); +			deleteTestTable(TestingTable.TABLE_02.name()); +	 +			// Generate test tables +			createTestTable(TestingTable.TABLE_01); +			createTestTable(TestingTable.TABLE_02); +	 +			// Populate test tables +			populateTestTable(TestingTable.TABLE_01); +			populateTestTable(TestingTable.TABLE_02); +	 +			// Print content of test table +			printHTable(TestingTable.TABLE_01); +			 +			// If we've reached here - the testing data are in +			Assert.assertEquals("true", "true"); +		} catch (IOException e) { +			LOG.error(e.toString()); +		} + +	} + +	private static void populateTestTable(TestingTable testingTable) +			throws IOException { +		// Load up HBase table +		HTable table = new HTable(config, testingTable.name()); +		 +		LOG.info("Populating table: " + testingTable.name()); + +		// Table_01 +		if (testingTable == TestingTable.TABLE_01) { +			Put put1 = new Put("2000-01-01 10:00:10".getBytes()); +			put1.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()); +			Put put2 = new Put("2000-01-01 10:05:00".getBytes()); +			put2.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes()); +			Put put3 = new Put("2000-01-01 10:10:00".getBytes()); +			put3.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes()); +			table.put(put1); +			table.put(put2); +			table.put(put3); +		} else +		// Table_02 +		if (testingTable == TestingTable.TABLE_02) { + +			// 3 versions at 10 o'clock +			byte[] k1 = "2000-01-01 10:00:00".getBytes(); +			Put put1 = new Put(k1); +			put1.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP        , "1".getBytes()); +			Put put2 = new Put(k1); +			put2.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes()); +			Put put3 = new Put(k1); +			put3.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes()); + +			// 3 versions at 11 o'clock +			byte[] k2 = "2000-01-01 11:00:00".getBytes(); +			Put put4 = new Put(k2); +			put4.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP        , "4".getBytes()); +			Put put5 = new Put(k2); +			put5.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes()); +			Put put6 = new Put(k2); +			put6.add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes()); + +			// Generate list of puts +			List<Put> puts = new ArrayList<Put>(); +			 +			puts.add(put1); +			puts.add(put2); +			puts.add(put3); +			puts.add(put4); +			puts.add(put5); +			puts.add(put6); +			 +			table.put(puts); +		} + +		table.close(); +	} + +	private static void createTestTable(TestingTable testingTable) +			throws IOException { + +		// Reset configuration +		config.clear(); +		config.set("hbase.zookeeper.quorum", QUORUM); +		config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT); + +		HBaseAdmin hbase = new HBaseAdmin(config); + +		// Get and set the name of the new table +		String tableName = testingTable.name(); +		HTableDescriptor newTable = new HTableDescriptor(tableName); + +		// Table1 +		if (testingTable == TestingTable.TABLE_01) { +			HColumnDescriptor meta = new HColumnDescriptor("data"); +			meta.setMaxVersions(3) + 		        .setCompressionType(Compression.Algorithm.NONE) +			    .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY) +				.setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE) +				.setTimeToLive(HColumnDescriptor.DEFAULT_TTL) +				.setBloomFilterType(StoreFile.BloomType.NONE); + +			newTable.addFamily(meta); +			// Table2 +		} else if (testingTable == TestingTable.TABLE_02) { +			HColumnDescriptor meta = new HColumnDescriptor("data".getBytes()); +			meta.setMaxVersions(3) +		        .setCompressionType(Compression.Algorithm.NONE) +		        .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY) +			    .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE) +			    .setTimeToLive(HColumnDescriptor.DEFAULT_TTL) +			    .setBloomFilterType(StoreFile.BloomType.NONE); + +//			HColumnDescriptor prefix = new HColumnDescriptor("account".getBytes()); +//			newTable.addFamily(prefix); +			newTable.addFamily(meta); +			LOG.info("scan 'TABLE_02' , { VERSIONS => 3 }"); +		} + +		try { +			LOG.info("Creating table " + tableName); +			hbase.createTable(newTable); +		} catch (TableExistsException et) { +			LOG.error("TableExistsException for table: " + tableName); +			LOG.debug(et.toString()); +		} catch (IOException e) { +			LOG.error("IOException: " + e.toString()); +		} + +		hbase.close(); +	} + +	/** +	 * Method to disable and delete HBase Tables i.e. "int-test-01" +	 */ +	private static void deleteTestTable(String tableName) throws IOException { + +		// Reset configuration +		config.clear(); +		config.set("hbase.zookeeper.quorum", QUORUM); +		config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT); + +		HBaseAdmin hbase = new HBaseAdmin(config); + +		if (hbase.tableExists(tableName)) { +			LOG.info("Table: " + tableName + " exists."); +			hbase.disableTable(tableName); +			hbase.deleteTable(tableName); +			LOG.info("Table: " + tableName + " disabled and deleted."); +		} else { +			LOG.info("Table: " + tableName + " does not exist."); +		} + +		hbase.close(); +	} + +	/** +	 * Method to print-out an HTable +	 */ +	private static void printHTable(TestingTable testingTable) +			throws IOException { + +		HTable table = new HTable(config, testingTable.name()); + +		Scan s = new Scan(); +		// Let scanner know which columns we are interested in +		ResultScanner scanner = table.getScanner(s); + +		LOG.info("Printing HTable: " + Bytes.toString(table.getTableName())); + +		try { +			// Iterate results +			for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { +				String key = Bytes.toString(rr.getRow()); +				Iterator<KeyValue> iter = rr.list().iterator(); + +				String header = "Key:\t"; +				String data = key + "\t"; + +				while (iter.hasNext()) { +					KeyValue kv = iter.next(); +					header += Bytes.toString(kv.getFamily()) + ":" +							+ Bytes.toString(kv.getQualifier()) + "\t"; +					data += Bytes.toString(kv.getValue()) + "\t"; +				} + +				LOG.info(header); +				LOG.info(data); +			} +			System.out.println(); +		} finally { +			// Make sure you close your scanners when you are done! +			// Thats why we have it inside a try/finally clause +			scanner.close(); +			table.close(); +		} +	} + +}
\ No newline at end of file diff --git a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java b/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java new file mode 100644 index 0000000..34d7ab4 --- /dev/null +++ b/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java @@ -0,0 +1,499 @@ +package parallelai.spyglass.hbase; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; + +import parallelai.spyglass.hbase.HBaseSalter; + + +public class HBaseSalterTester { + +	@Test +	public void addSaltPrefix() throws IOException { +		String keyStr = "1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		byte [] expected = Bytes.toBytes("1_1021"); +		byte [] actual = HBaseSalter.addSaltPrefix(keyBytes); +		 +		assertArrayEquals(actual, expected); +		 +		String actualStr = HBaseSalter.addSaltPrefix(keyStr); +		 +		System.out.println(Bytes.toString(expected) + " -> " + actualStr ); +		 +		assertEquals(Bytes.toString(expected), actualStr); + +	} + +	@Test +	public void delSaltPrefix() throws IOException { +		String keyStr = "1_1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		byte [] expected = Bytes.toBytes("1021"); +		byte [] actual = HBaseSalter.delSaltPrefix(keyBytes); +		 +		assertArrayEquals(actual, expected); +		 +		String actualStr = HBaseSalter.delSaltPrefix(keyStr); +		 +		assertEquals(Bytes.toString(expected), actualStr); + +	} + +	@Test +	public void getAllKeys() throws IOException { +		String keyStr = "1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		 +		char [] prefixArr = HBaseSalter.DEFAULT_PREFIX_LIST.toCharArray(); +		 +		byte [][] expected = new byte[prefixArr.length][]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); +		} +		 +		byte [][] actual = HBaseSalter.getAllKeys(keyBytes); +		 +		assertEquals(expected.length, actual.length); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +	} + +	@Test +	public void getAllKeysWithPrefix() throws IOException { +		String keyStr = "1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		String prefix = "0123456789"; +		 +		char [] prefixArr = prefix.toCharArray(); +		 +		byte [][] expected = new byte[prefixArr.length][]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); +		} +		 +		byte [][] actual = HBaseSalter.getAllKeys(keyBytes, prefix); +		 +		assertEquals(expected.length, actual.length); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +	} + +	@Test +	public void getAllKeysWithPrefixAndRange() throws IOException { +		String keyStr = "1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		String prefix = "1234"; +		String fullPrefix = "0123456789"; +		 +		char [] prefixArr = prefix.toCharArray(); +		Byte [] prefixBytes = new Byte[prefixArr.length];  +		 +		byte [][] expected = new byte[prefixArr.length][]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); +			prefixBytes[i] = (byte)prefixArr[i]; +		} +		 +		byte [][] actual = HBaseSalter.getAllKeysInRange(keyBytes, fullPrefix, (byte)'1', (byte)'5'); +		 +		assertEquals(expected.length, actual.length); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +		 +		actual = HBaseSalter.getAllKeys(keyBytes, prefixBytes); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +		 +	} + + +	@Test +	public void getAllKeysWithPrefixWithStart() throws IOException { +		String keyStr = "1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		String prefix = "3456789"; +		String fullPrefix = "0123456789"; +		 +		char [] prefixArr = prefix.toCharArray(); +		Byte [] prefixBytes = new Byte[prefixArr.length];  +		 +		byte [][] expected = new byte[prefixArr.length][]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); +			prefixBytes[i] = (byte)prefixArr[i]; +		} +		 +		byte [][] actual = HBaseSalter.getAllKeysWithStart(keyBytes, fullPrefix, (byte)'3'); +		 +		assertEquals(expected.length , actual.length); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +		 +		actual = HBaseSalter.getAllKeys(keyBytes, prefixBytes); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +		 +	} + +	@Test +	public void getAllKeysWithPrefixWithStop() throws IOException { +		String keyStr = "1021"; +		byte [] keyBytes = Bytes.toBytes(keyStr); +		String prefix = "01234"; +		String fullPrefix = "0123456789"; +		 +		char [] prefixArr = prefix.toCharArray(); +		Byte [] prefixBytes = new Byte[prefixArr.length];  +		 +		byte [][] expected = new byte[prefixArr.length][]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); +			prefixBytes[i] = (byte)prefixArr[i]; +		} +		 +		byte [][] actual = HBaseSalter.getAllKeysWithStop(keyBytes, fullPrefix, (byte)'5'); +		 +		assertEquals(expected.length , actual.length); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +		 +		actual = HBaseSalter.getAllKeys(keyBytes, prefixBytes); +		 +		for( int i = 0; i < expected.length; i++) { +			assertArrayEquals(expected[i], actual[i]); +		} +	} +	 +	@Test +	public void getDistributedIntervals() throws IOException { +		String keyStrStart = "1021"; +		byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + +		String keyStrStop = "1022"; +		byte [] keyBytesStop = Bytes.toBytes(keyStrStop); + +		char [] prefixArr = HBaseSalter.DEFAULT_PREFIX_LIST.toCharArray(); +		 +		byte [][] expectedStart = new byte[prefixArr.length][]; +		byte [][] expectedStop = new byte[prefixArr.length][]; +		Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); +			expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); +			expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); +		} +		 +		Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop); +		 +		assertEquals(expectedPairs.length, actualPairs.length); +		 +		for( int i = 0; i < expectedPairs.length; i++ ) { +//			System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)",  +//					Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); +// +//			System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)",  +//					Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + +			assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); +			assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); +		} +	} +	 +	 +	@Test +	public void getDistributedIntervalsWithPrefix() throws IOException { +		String keyStrStart = "1021"; +		byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + +		String keyStrStop = "1022"; +		byte [] keyBytesStop = Bytes.toBytes(keyStrStop); + +		String prefix = "0123"; +		char [] prefixArr = prefix.toCharArray(); +		 +		byte [][] expectedStart = new byte[prefixArr.length][]; +		byte [][] expectedStop = new byte[prefixArr.length][]; +		Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); +			expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); +			expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); +		} +		 +		Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop, prefix); +		 +		assertEquals(expectedPairs.length, actualPairs.length); +		 +		for( int i = 0; i < expectedPairs.length; i++ ) { +			System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)",  +					Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); + +			System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)",  +					Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + +			assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); +			assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); +		} +	} +	 +	@Test +	public void getDistributedIntervalsWithRegionsStartStop() throws IOException { +		String keyStrStart = "1021"; +		byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + +		String keyStrStop = "1022"; +		byte [] keyBytesStop = Bytes.toBytes(keyStrStop); +		 +		byte [] regionStart = Bytes.toBytes("1"); +		byte [] regionsStop = Bytes.toBytes("4"); + +		String expectedPrefix = "123"; +		char [] prefixArr = expectedPrefix.toCharArray(); +		 +		byte [][] expectedStart = new byte[prefixArr.length][]; +		byte [][] expectedStop = new byte[prefixArr.length][]; +		Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); +			expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); +			expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); +		} +		 +		Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop, regionStart, regionsStop, HBaseSalter.DEFAULT_PREFIX_LIST); +		 +		assertEquals(expectedPairs.length, actualPairs.length); +		 +		for( int i = 0; i < expectedPairs.length; i++ ) { +			System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)",  +					Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); + +			System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)",  +					Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + +			assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); +			assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); +		} +	} +	 +	 +	@Test +	public void getDistributedIntervalsWithRegionsStartStopWithPrefixAll() throws IOException { +		System.out.println("------------ TEST 20 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +		    "1020", "1021", +		    "1_1021", "3_1023", +		    "12", "012345" +				); + +		System.out.println("------------ TEST 21 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			    "1020", "1021", +			    "2_1021", Bytes.toString(HConstants.EMPTY_END_ROW), +			    "2345", "012345" +					); + +		System.out.println("------------ TEST 22 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			    "1020", "1021", +			    Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", +			    "012", "012345" +					); + +		System.out.println("------------ TEST 23 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			    "1020", "1021", +			    Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +			    "012345", "012345" +					); + +		System.out.println("------------ TEST 24 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			Bytes.toString(HConstants.EMPTY_START_ROW), "1021", +		    "1_1021", "3_1023", +		    "12", "012345" +				); + +		System.out.println("------------ TEST 25 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			"1020", Bytes.toString(HConstants.EMPTY_END_ROW), +		    "1_1021", "3_1023", +		    "12", "012345" +				); + +		System.out.println("------------ TEST 26 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +				Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +		    "1_1021", "3_1023", +		    "12", "012345" +				); + +		System.out.println("------------ TEST 27 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			Bytes.toString(HConstants.EMPTY_START_ROW), "1021", +			Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", +		    "012", "012345" +				); + +		System.out.println("------------ TEST 28 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			"1020", Bytes.toString(HConstants.EMPTY_END_ROW), +			Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", +		    "012", "012345" +				); + +		System.out.println("------------ TEST 29 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +			Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", +		    "012", "012345" +				); + +		System.out.println("------------ TEST 30 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			Bytes.toString(HConstants.EMPTY_START_ROW), "1021", +		    "1_1021", Bytes.toString(HConstants.EMPTY_END_ROW), +		    "12345", "012345" +				); + +		System.out.println("------------ TEST 31 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			"1020", Bytes.toString(HConstants.EMPTY_END_ROW), +		    "1_1021", Bytes.toString(HConstants.EMPTY_END_ROW), +		    "12345", "012345" +				); + +		System.out.println("------------ TEST 32 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +				Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +		    "1_1021", Bytes.toString(HConstants.EMPTY_END_ROW), +		    "12345", "012345" +				); + +		System.out.println("------------ TEST 33 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			Bytes.toString(HConstants.EMPTY_START_ROW), "1021", +			Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +		    "012345", "012345" +				); + +		System.out.println("------------ TEST 34 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +			"1020", Bytes.toString(HConstants.EMPTY_END_ROW), +			Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +		    "012345", "012345" +				); + +		System.out.println("------------ TEST 35 --------------"); +		getDistributedIntervalsWithRegionsStartStopWithPrefix( +				Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +				Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), +		    "012345", "012345" +				); + +	} +	 +	private void getDistributedIntervalsWithRegionsStartStopWithPrefix( +			String keyStrStart, String keyStrStop, +			String regionStrStart, String regionStrStop, +			String expectedPrefix, String sendPrefix) throws IOException { + +		byte [] keyBytesStart = Bytes.toBytes(keyStrStart); +		byte [] keyBytesStop = Bytes.toBytes(keyStrStop); +		 +		byte [] regionStart = Bytes.toBytes(regionStrStart); +		byte [] regionsStop = Bytes.toBytes(regionStrStop); + +		char [] prefixArr = expectedPrefix.toCharArray(); +		 +		byte [][] expectedStart = new byte[prefixArr.length][]; +		byte [][] expectedStop = new byte[prefixArr.length][]; +		Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); +			expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); +		} +		 +		if( Arrays.equals(keyBytesStart, HConstants.EMPTY_START_ROW) +				&& Arrays.equals(keyBytesStop, HConstants.EMPTY_END_ROW) ) { +			for( int i = expectedStart.length - 1; i >=1; i--) { +				expectedStart[i] = expectedStart[i - 1]; +			} +			 +			expectedStart[0] = HConstants.EMPTY_START_ROW; +			expectedStop[expectedStop.length - 1] = HConstants.EMPTY_END_ROW; +		} else if(Arrays.equals(keyBytesStart, HConstants.EMPTY_START_ROW)) { +			for( int i = expectedStart.length - 1; i >=1; i--) { +				expectedStart[i] = expectedStart[i - 1]; +			} +			 +			expectedStart[0] = HConstants.EMPTY_START_ROW; +		} else if (Arrays.equals(keyBytesStop, HConstants.EMPTY_END_ROW)) { +			for(int i = 0; i < expectedStop.length - 1; i++ ) { +				expectedStop[i] = expectedStop[i + 1]; +			} +			expectedStop[expectedStop.length - 1] = HConstants.EMPTY_END_ROW; +		} +		 +		for(int i = 0; i < prefixArr.length; i++ ) { +			expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); +		} +		 +		Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop, regionStart, regionsStop, sendPrefix); +		 +		for(Pair<byte[], byte[]> p : expectedPairs ) { +			System.out.println("- EXPECTED " + Bytes.toString(p.getFirst()) +					+ " -> " + Bytes.toString(p.getSecond())); +		} +		 +		for(Pair<byte[], byte[]> p : actualPairs ) { +			System.out.println("- ACTUAL " + Bytes.toString(p.getFirst()) +					+ " -> " + Bytes.toString(p.getSecond())); +		} +		 +		assertEquals(expectedPairs.length, actualPairs.length); +		 +		for( int i = 0; i < expectedPairs.length; i++ ) { +//			System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)",  +//					Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); +// +//			System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)",  +//					Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + +			assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); +			assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); +		} +	} +	 +	 +} | 
