aboutsummaryrefslogtreecommitdiffstats
path: root/README.md
diff options
context:
space:
mode:
Diffstat (limited to 'README.md')
-rw-r--r--README.md38
1 files changed, 38 insertions, 0 deletions
diff --git a/README.md b/README.md
index 12c55ea..ee5abb9 100644
--- a/README.md
+++ b/README.md
@@ -227,5 +227,43 @@ Add the trait to the job class and start using the conversions in the pipe direc
useSalt = true ))
}
+5. Raw HBase Tap and Source
+===========================
+HBaseRawSource is an alternative HBase source implementation that provides two main features:
+* Ability to provide a custom scan object.
+* Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns).
+**Passing a scan object**
+
+HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source.
+e.g.
+
+ val scan = new Scan
+ val key = "my_key_prefix"
+ scan.setStartRow(Bytes.toBytes(key))
+ scan.setFilter(new PrefixFilter(Bytes.toBytes(key)))
+ val scanner = HBaseRawSource.convertScanToString(scan)
+ val hbaseSource = new HBaseRawSource("MY-TABLE", "hbase-local", Array("col-family"), base64Scan = scanner)
+
+**Processing the rows**
+
+The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row).
+The first field is the row key, the second is the row Result object. You can then process the row as needed.
+The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source).
+It will then write the output fields (except the rowkey) as columns under the provided family, using the field name as the column name.
+You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source.
+e.g.
+
+ val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE)
+ hbaseSource.read
+ .mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) {
+ x: (ImmutableBytesWritable, Result) =>
+ {
+ val (rowkey, row) = x
+ val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col1"))) * 2;
+ val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2"));
+ (rowkey, col1Times2, col2)
+ }
+ }
+ .write(hbaseOut)