diff options
Diffstat (limited to 'README.md')
-rw-r--r-- | README.md | 255 |
1 files changed, 171 insertions, 84 deletions
@@ -1,7 +1,9 @@ SpyGlass ======== -Cascading and Scalding wrapper for HBase with advanced read features +Cascading and Scalding wrapper for HBase with advanced read and write features. + +Prevent Hot Spotting by the use of transparent key prefixes. Building ======== @@ -9,108 +11,193 @@ Building $ mvn clean install -U Requires Maven 3.x.x + +1. Read Mode Features +===================== -Example -======= +HBaseSource supports modes namely GET_LIST, SCAN_RANGE and SCAN_ALL. Use the *sourceMode* parameter to select the source mode. - package parallelai.spyglass.hbase.example - - import org.apache.hadoop.conf.Configuration - import org.apache.hadoop.hbase.HBaseConfiguration - import org.apache.hadoop.hbase.client.HConnectionManager - import org.apache.hadoop.hbase.client.HTable - import org.apache.hadoop.hbase.util.Bytes - import org.apache.log4j.Level - import org.apache.log4j.Logger - - import com.twitter.scalding._ - import com.twitter.scalding.Args - - import parallelai.spyglass.base.JobBase - import parallelai.spyglass.hbase.HBaseSource - import parallelai.spyglass.hbase.HBaseConstants.SourceMode - - class HBaseExample(args: Args) extends JobBase(args) { - - val isDebug: Boolean = args("debug").toBoolean - - if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) - - val output = args("output") - - println(output) - - val jobConf = getJobConf - - val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" - - case class HBaseTableStore( - conf: Configuration, - quorum: String, - tableName: String) { - - val tableBytes = Bytes.toBytes(tableName) - val connection = HConnectionManager.getConnection(conf) - val maxThreads = conf.getInt("hbase.htable.threads.max", 1) - - conf.set("hbase.zookeeper.quorum", quorumNames); - - val htable = new HTable(HBaseConfiguration.create(conf), tableName) - - } - - val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") - - val hbs2 = new HBaseSource( + - **GET_LIST** -> Provide a list of keys to retrieve from the HBase table + - **SCAN_RANGE** -> Provide a start and stop key (inclusive) to get out of the HBase table. + - **SCAN_ALL** -> Get all rows form the HBase Table + +GET_LIST +-------- +Requires the *keyList* parameter to be specified as well. + +(e.g.) + val hbs2 = new HBaseSource( "table_name", "quorum_name:2181", 'key, Array("column_family"), Array('column_name), sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) - .read - .write(Tsv(output.format("get_list"))) - - val hbs3 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") - .read - .write(Tsv(output.format("scan_all"))) - - val hbs4 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") - .read - .write(Tsv(output.format("scan_range_to_end"))) - - val hbs5 = new HBaseSource( + + +Additionally, the *versions* parameter can be used to retrieve more than one version of the row. + +(e.g.) + val hbs2 = new HBaseSource( "table_name", "quorum_name:2181", 'key, Array("column_family"), Array('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") - .read - .write(Tsv(output.format("scan_range_from_start"))) - - val hbs6 = new HBaseSource( + sourceMode = SourceMode.GET_LIST, + versions = 5, + keyList = List("5003914", "5000687", "5004897")) + + +SCAN_RANGE +---------- +Scan range uses the optional *startKey* and *stopKey* parameters to specify the range of keys to extract; both keys are inclusive. + +if: + - Only *startKey* provided -> All rows from *startKey* till *END OF TABLE* are returned + - Only *stopKey* provided -> All rows from *START OF TABLE* till *stopKey* are returned + - Neither provided -> All rows in table are returned. + +(e.g.) + val hbs4 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") + .read + .write(Tsv(output.format("scan_range_to_end"))) + + val hbs5 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") + .read + .write(Tsv(output.format("scan_range_from_start"))) + + val hbs6 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") + .read + .write(Tsv(output.format("scan_range_between"))) + + +SCAN_ALL +-------- +Returns all rows in the table + +(e.g.) + val hbs2 = new HBaseSource( "table_name", "quorum_name:2181", 'key, Array("column_family"), Array('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") - .read - .write(Tsv(output.format("scan_range_between"))) + sourceMode = SourceMode.SCAN_ALL) + + +2. Write Mode Features +====================== + +HBaseSource supports writing at a particular time stamp i.e. a version. + +The time dimension can be added to the row by using the *timestamp* parameter. If the parameter is not present the current time is used. + +(e.g.) + .write(new HBaseSource( "table_name", + "quorum_name:2181", + 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + timestamp = Platform.currentTime )) + +3. Region Hot Spot Prevention +============================= + +Region hot spotting is a common problem with HBase. Spy Glass uses key prefix salting to avoid this. +The row key is prefixed with the last byte followed by a '_' (underscore) character. + +(e.g.) +Original Row Key -> Becomes +SPYGLASS -> S_SPYGLASS +12345678 -> 8_12345678 + +Conversion to and from salted keys is done automatically. + +Setting the *useSalt* parameter to *true* enables this functionality + + +(e.g.) + val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + + val hbase07 = + new HBaseSource( "table_name", + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = "0123456789" ) + .read + + // Convert from ImmutableBytesWritable to String + .fromBytesWritable( TABLE_SCHEMA ) + + .write(TextLine("saltTesting/ScanRangePlusSalt10")) + + // Convert from String to ImmutableBytesWritable + .toBytesWritable( TABLE_SCHEMA ) + + .write(new HBaseSource( "table_name", + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + useSalt = true )) + + +Setting the *prefixList* parameter to the available prefixes can increase the read performance quite a bit. + +4. Pipe Conversion Implicits +============================ + +HBaseSource will always read or write fields of type *ImmutableBytesWritable*. The supplied *HBasePipeConversions* trait is used to convert to and from *String* to *ImmutableBytesWritable* + +Add the trait to the job class and start using the conversions in the pipe directly + +(e.g.) + class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions { + val TABLE_SCHEMA = List('key, 'salted, 'unsalted) + + val hbase07 = + new HBaseSource( "table_name", + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = "0123456789" ) + .read + + // Convert from ImmutableBytesWritable to String + .fromBytesWritable( TABLE_SCHEMA ) + + .write(TextLine("saltTesting/ScanRangePlusSalt10")) + + // Convert from String to ImmutableBytesWritable + .toBytesWritable( TABLE_SCHEMA ) + + .write(new HBaseSource( "table_name", + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + useSalt = true )) } + + |