SpyGlass
========
Cascading and Scalding wrapper for HBase/JDBC with advanced read and write features.
Prevent Hot Spotting by the use of transparent key prefixes.
Building
========
$ mvn clean install -U
Requires Maven 3.x.x
To use SpyGlass as a dependency use the following repository
parallelai-releases
https://github.com/ParallelAI/mvn-repo/raw/master/releases
parallelai
parallelai.spyglass
2.0.3
1. Read Mode Features
=====================
HBaseSource supports modes namely **GET_LIST**, **SCAN_RANGE** and **SCAN_ALL**. Use the **_sourceMode_** parameter to select the source mode.
- **GET_LIST** -> Provide a list of keys to retrieve from the HBase table
- **SCAN_RANGE** -> Provide a start and stop key (inclusive) to get out of the HBase table.
- **SCAN_ALL** -> Get all rows form the HBase Table
GET_LIST
--------
Requires the **_keyList_** parameter to be specified as well.
(e.g.)
val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
Additionally, the **_versions_** parameter can be used to retrieve more than one version of the row.
(e.g.)
val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.GET_LIST,
versions = 5,
keyList = List("5003914", "5000687", "5004897"))
SCAN_RANGE
----------
Scan range uses the optional **_startKey_** and **_stopKey_** parameters to specify the range of keys to extract; both keys are inclusive.
if:
- Only **_startKey_** provided -> All rows from **_startKey_** till **END OF TABLE** are returned
- Only **_stopKey_** provided -> All rows from **START OF TABLE** till **_stopKey_** are returned
- Neither provided -> All rows in table are returned.
(e.g.)
val hbs4 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
.read
.write(Tsv(output.format("scan_range_to_end")))
val hbs5 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
.read
.write(Tsv(output.format("scan_range_from_start")))
val hbs6 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
.read
.write(Tsv(output.format("scan_range_between")))
SCAN_ALL
--------
Returns all rows in the table
(e.g.)
val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.SCAN_ALL)
2. Write Mode Features
======================
HBaseSource supports writing at a particular time stamp i.e. a version.
The time dimension can be added to the row by using the **_timestamp_** parameter. If the parameter is not present the current time is used.
(e.g.)
pipe.write(new HBaseSource( "table_name",
"quorum_name:2181",
'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
timestamp = Platform.currentTime ))
3. Region Hot Spot Prevention
=============================
Region hot spotting is a common problem with HBase. Spy Glass uses key prefix salting to avoid this.
The row key is prefixed with the last byte followed by a '_' (underscore) character.
(e.g.)
Original Row Key -> Becomes
SPYGLASS -> S_SPYGLASS
12345678 -> 8_12345678
Conversion to and from salted keys is done automatically.
Setting the **_useSalt_** parameter to **true** enables this functionality
(e.g.)
val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
val hbase07 =
new HBaseSource( "table_name",
"quorum_name:2181", 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455",
useSalt = true, prefixList = "0123456789" )
.read
// Convert from ImmutableBytesWritable to String
.fromBytesWritable( TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanRangePlusSalt10"))
// Convert from String to ImmutableBytesWritable
.toBytesWritable( TABLE_SCHEMA )
.write(new HBaseSource( "table_name",
"quorum_name:2181", 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
useSalt = true ))
Setting the **_prefixList_** parameter to the available prefixes can increase the read performance quite a bit.
4. Pipe Conversion Implicits
============================
HBaseSource will always read or write fields of type **_ImmutableBytesWritable_**. The supplied **_HBasePipeConversions_** trait is used to convert to and from **_String_** to **_ImmutableBytesWritable_**
Add the trait to the job class and start using the conversions in the pipe directly
(e.g.)
class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
val hbase07 =
new HBaseSource( "table_name",
"quorum_name:2181", 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455",
useSalt = true, prefixList = "0123456789" )
.read
// Convert from ImmutableBytesWritable to String
.fromBytesWritable( TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanRangePlusSalt10"))
// Convert from String to ImmutableBytesWritable
.toBytesWritable( TABLE_SCHEMA )
.write(new HBaseSource( "table_name",
"quorum_name:2181", 'key,
TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
useSalt = true ))
}