diff options
Diffstat (limited to 'README.md')
-rw-r--r-- | README.md | 38 |
1 files changed, 38 insertions, 0 deletions
@@ -227,5 +227,43 @@ Add the trait to the job class and start using the conversions in the pipe direc useSalt = true )) } +5. Raw HBase Tap and Source +=========================== +HBaseRawSource is an alternative HBase source implementation that provides two main features: +* Ability to provide a custom scan object. +* Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns). +**Passing a scan object** + +HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source. +e.g. + + val scan = new Scan + val key = "my_key_prefix" + scan.setStartRow(Bytes.toBytes(key)) + scan.setFilter(new PrefixFilter(Bytes.toBytes(key))) + val scanner = HBaseRawSource.convertScanToString(scan) + val hbaseSource = new HBaseRawSource("MY-TABLE", "hbase-local", Array("col-family"), base64Scan = scanner) + +**Processing the rows** + +The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). +The first field is the row key, the second is the row Result object. You can then process the row as needed. +The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source). +It will then write the output fields (except the rowkey) as columns under the provided family, using the field name as the column name. +You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. +e.g. + + val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE) + hbaseSource.read + .mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { + x: (ImmutableBytesWritable, Result) => + { + val (rowkey, row) = x + val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col1"))) * 2; + val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); + (rowkey, col1Times2, col2) + } + } + .write(hbaseOut) |