aboutsummaryrefslogtreecommitdiffstats
path: root/README.md
diff options
context:
space:
mode:
Diffstat (limited to 'README.md')
-rw-r--r--README.md255
1 files changed, 171 insertions, 84 deletions
diff --git a/README.md b/README.md
index 7f65d56..ee0568a 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,9 @@
SpyGlass
========
-Cascading and Scalding wrapper for HBase with advanced read features
+Cascading and Scalding wrapper for HBase with advanced read and write features.
+
+Prevent Hot Spotting by the use of transparent key prefixes.
Building
========
@@ -9,108 +11,193 @@ Building
$ mvn clean install -U
Requires Maven 3.x.x
+
+1. Read Mode Features
+=====================
-Example
-=======
+HBaseSource supports modes namely GET_LIST, SCAN_RANGE and SCAN_ALL. Use the *sourceMode* parameter to select the source mode.
- package parallelai.spyglass.hbase.example
-
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.HConnectionManager
- import org.apache.hadoop.hbase.client.HTable
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.log4j.Level
- import org.apache.log4j.Logger
-
- import com.twitter.scalding._
- import com.twitter.scalding.Args
-
- import parallelai.spyglass.base.JobBase
- import parallelai.spyglass.hbase.HBaseSource
- import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-
- class HBaseExample(args: Args) extends JobBase(args) {
-
- val isDebug: Boolean = args("debug").toBoolean
-
- if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG)
-
- val output = args("output")
-
- println(output)
-
- val jobConf = getJobConf
-
- val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181"
-
- case class HBaseTableStore(
- conf: Configuration,
- quorum: String,
- tableName: String) {
-
- val tableBytes = Bytes.toBytes(tableName)
- val connection = HConnectionManager.getConnection(conf)
- val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
-
- conf.set("hbase.zookeeper.quorum", quorumNames);
-
- val htable = new HTable(HBaseConfiguration.create(conf), tableName)
-
- }
-
- val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet")
-
- val hbs2 = new HBaseSource(
+ - **GET_LIST** -> Provide a list of keys to retrieve from the HBase table
+ - **SCAN_RANGE** -> Provide a start and stop key (inclusive) to get out of the HBase table.
+ - **SCAN_ALL** -> Get all rows form the HBase Table
+
+GET_LIST
+--------
+Requires the *keyList* parameter to be specified as well.
+
+(e.g.)
+ val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
- .read
- .write(Tsv(output.format("get_list")))
-
- val hbs3 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
- .read
- .write(Tsv(output.format("scan_all")))
-
- val hbs4 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_to_end")))
-
- val hbs5 = new HBaseSource(
+
+
+Additionally, the *versions* parameter can be used to retrieve more than one version of the row.
+
+(e.g.)
+ val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_from_start")))
-
- val hbs6 = new HBaseSource(
+ sourceMode = SourceMode.GET_LIST,
+ versions = 5,
+ keyList = List("5003914", "5000687", "5004897"))
+
+
+SCAN_RANGE
+----------
+Scan range uses the optional *startKey* and *stopKey* parameters to specify the range of keys to extract; both keys are inclusive.
+
+if:
+ - Only *startKey* provided -> All rows from *startKey* till *END OF TABLE* are returned
+ - Only *stopKey* provided -> All rows from *START OF TABLE* till *stopKey* are returned
+ - Neither provided -> All rows in table are returned.
+
+(e.g.)
+ val hbs4 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
+ .read
+ .write(Tsv(output.format("scan_range_to_end")))
+
+ val hbs5 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
+ .read
+ .write(Tsv(output.format("scan_range_from_start")))
+
+ val hbs6 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
+ .read
+ .write(Tsv(output.format("scan_range_between")))
+
+
+SCAN_ALL
+--------
+Returns all rows in the table
+
+(e.g.)
+ val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
Array("column_family"),
Array('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
- .read
- .write(Tsv(output.format("scan_range_between")))
+ sourceMode = SourceMode.SCAN_ALL)
+
+
+2. Write Mode Features
+======================
+
+HBaseSource supports writing at a particular time stamp i.e. a version.
+
+The time dimension can be added to the row by using the *timestamp* parameter. If the parameter is not present the current time is used.
+
+(e.g.)
+ .write(new HBaseSource( "table_name",
+ "quorum_name:2181",
+ 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ timestamp = Platform.currentTime ))
+
+3. Region Hot Spot Prevention
+=============================
+
+Region hot spotting is a common problem with HBase. Spy Glass uses key prefix salting to avoid this.
+The row key is prefixed with the last byte followed by a '_' (underscore) character.
+
+(e.g.)
+Original Row Key -> Becomes
+SPYGLASS -> S_SPYGLASS
+12345678 -> 8_12345678
+
+Conversion to and from salted keys is done automatically.
+
+Setting the *useSalt* parameter to *true* enables this functionality
+
+
+(e.g.)
+ val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
+
+ val hbase07 =
+ new HBaseSource( "table_name",
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = "0123456789" )
+ .read
+
+ // Convert from ImmutableBytesWritable to String
+ .fromBytesWritable( TABLE_SCHEMA )
+
+ .write(TextLine("saltTesting/ScanRangePlusSalt10"))
+
+ // Convert from String to ImmutableBytesWritable
+ .toBytesWritable( TABLE_SCHEMA )
+
+ .write(new HBaseSource( "table_name",
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ useSalt = true ))
+
+
+Setting the *prefixList* parameter to the available prefixes can increase the read performance quite a bit.
+
+4. Pipe Conversion Implicits
+============================
+
+HBaseSource will always read or write fields of type *ImmutableBytesWritable*. The supplied *HBasePipeConversions* trait is used to convert to and from *String* to *ImmutableBytesWritable*
+
+Add the trait to the job class and start using the conversions in the pipe directly
+
+(e.g.)
+ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
+ val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
+
+ val hbase07 =
+ new HBaseSource( "table_name",
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = "0123456789" )
+ .read
+
+ // Convert from ImmutableBytesWritable to String
+ .fromBytesWritable( TABLE_SCHEMA )
+
+ .write(TextLine("saltTesting/ScanRangePlusSalt10"))
+
+ // Convert from String to ImmutableBytesWritable
+ .toBytesWritable( TABLE_SCHEMA )
+
+ .write(new HBaseSource( "table_name",
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ useSalt = true ))
}
+
+