diff options
-rw-r--r-- | README.md | 38 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 8 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 95 |
3 files changed, 100 insertions, 41 deletions
@@ -227,5 +227,43 @@ Add the trait to the job class and start using the conversions in the pipe direc useSalt = true )) } +5. Raw HBase Tap and Source +=========================== +HBaseRawSource is an alternative HBase source implementation that provides two main features: +* Ability to provide a custom scan object. +* Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns). +**Passing a scan object** + +HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source. +e.g. + + val scan = new Scan + val key = "my_key_prefix" + scan.setStartRow(Bytes.toBytes(key)) + scan.setFilter(new PrefixFilter(Bytes.toBytes(key))) + val scanner = HBaseRawSource.convertScanToString(scan) + val hbaseSource = new HBaseRawSource("MY-TABLE", "hbase-local", Array("col-family"), base64Scan = scanner) + +**Processing the rows** + +The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). +The first field is the row key, the second is the row Result object. You can then process the row as needed. +The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source). +It will then write the output fields (except the rowkey) as columns under the provided family, using the field name as the column name. +You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. +e.g. + + val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE) + hbaseSource.read + .mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { + x: (ImmutableBytesWritable, Result) => + { + val (rowkey, row) = x + val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col1"))) * 2; + val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); + (rowkey, col1Times2, col2) + } + } + .write(hbaseOut) diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 738fd51..0421b6e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -131,11 +131,11 @@ public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { /** * Constructor HBaseTap creates a new HBaseTap instance. * - * @param quorumNames - * @param tableName + * @param quorumNames HBase quorum + * @param tableName The name of the HBase table to read * @param HBaseFullScheme - * @param base64Scan - * @param sinkMode + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to */ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { super(HBaseFullScheme, sinkMode); diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 1fc6f7d..bbc205b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -18,45 +18,66 @@ import java.io.ByteArrayOutputStream import java.io.DataOutputStream object HBaseRawSource { - def convertScanToString(scan: Scan) = { - val out = new ByteArrayOutputStream(); - val dos = new DataOutputStream(out); - scan.write(dos); - Base64.encodeBytes(out.toByteArray()); - } + /** + * Converts a scan object to a base64 string that can be passed to HBaseRawSource + * @param scan + * @return base64 string representation + */ + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } } + + +/** + * @author Rotem Hermon + * + * HBaseRawSource is a scalding source that passes the original row (Result) object to the + * mapper for customized processing. + * + * @param tableName The name of the HBase table to read + * @param quorumNames HBase quorum + * @param familyNames Column families to get (source, if null will get all) or update to (sink) + * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to + * + */ class HBaseRawSource( - tableName: String, - quorumNames: String = "localhost", - familyNames: Array[String], - writeNulls: Boolean = true, - base64Scan:String = null, - sinkMode: SinkMode = null) extends Source { + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan: String = null, + sinkMode: SinkMode = null) extends Source { - override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - val hBaseScheme = hdfsScheme match { - case hbase: HBaseRawScheme => hbase - case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") - } - mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { - case Read => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.KEEP - case _ => sinkMode - }).asInstanceOf[Tap[_,_,_]] - } - case Write => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.UPDATE - case _ => sinkMode - }).asInstanceOf[Tap[_,_,_]] - } - } - case _ => super.createTap(readOrWrite)(mode) - } - } + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } } |