From 0c0336e421030b59b598829c3bcaffcf7a5b6d68 Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:38:25 +0300 Subject: Added some readme and comments --- README.md | 35 +++++++- .../parallelai/spyglass/hbase/HBaseRawTap.java | 8 +- .../parallelai/spyglass/hbase/HBaseRawSource.scala | 95 +++++++++++++--------- 3 files changed, 96 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 12c55ea..65a03e5 100644 --- a/README.md +++ b/README.md @@ -227,5 +227,38 @@ Add the trait to the job class and start using the conversions in the pipe direc useSalt = true )) } - +5. Raw HBase Tap and Source +=========================== +HBaseRawSource is an alternative HBase source implementation that provides two main features: +1. Ability to provide a custom scan object. +2. Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns). + +**Passing a scan object** +HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source. +e.g. + val scan = new Scan + val key = "my_key_prefix" + scan.setStartRow(Bytes.toBytes(key)) + scan.setFilter(new PrefixFilter(Bytes.toBytes(key))) + val scanner = HBaseRawSource.convertScanToString(scan) + val hbaseSource = new HBaseRawSource("MY-TABLE", "hbase-local", Array("col-family"), base64Scan = scanner) + +**Processing the rows** +The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). +The first field is the row key, the second is the row Result object. You can then process the row as needed. +The sink will write the output fields as columns under the provided family and field name as the column name. +You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. +e.g. + val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE) + hbaseSource.read + .mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { + x: (ImmutableBytesWritable, Result) => + { + val (rowkey, row) = x + val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col1"))) * 2; + val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); + (rowkey, col1Times2, col2) + } + } + .write(out) diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 738fd51..0421b6e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -131,11 +131,11 @@ public class HBaseRawTap extends Tap { /** * Constructor HBaseTap creates a new HBaseTap instance. * - * @param quorumNames - * @param tableName + * @param quorumNames HBase quorum + * @param tableName The name of the HBase table to read * @param HBaseFullScheme - * @param base64Scan - * @param sinkMode + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to */ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { super(HBaseFullScheme, sinkMode); diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 1fc6f7d..bbc205b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -18,45 +18,66 @@ import java.io.ByteArrayOutputStream import java.io.DataOutputStream object HBaseRawSource { - def convertScanToString(scan: Scan) = { - val out = new ByteArrayOutputStream(); - val dos = new DataOutputStream(out); - scan.write(dos); - Base64.encodeBytes(out.toByteArray()); - } + /** + * Converts a scan object to a base64 string that can be passed to HBaseRawSource + * @param scan + * @return base64 string representation + */ + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } } + + +/** + * @author Rotem Hermon + * + * HBaseRawSource is a scalding source that passes the original row (Result) object to the + * mapper for customized processing. + * + * @param tableName The name of the HBase table to read + * @param quorumNames HBase quorum + * @param familyNames Column families to get (source, if null will get all) or update to (sink) + * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to + * + */ class HBaseRawSource( - tableName: String, - quorumNames: String = "localhost", - familyNames: Array[String], - writeNulls: Boolean = true, - base64Scan:String = null, - sinkMode: SinkMode = null) extends Source { + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan: String = null, + sinkMode: SinkMode = null) extends Source { - override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - val hBaseScheme = hdfsScheme match { - case hbase: HBaseRawScheme => hbase - case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") - } - mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { - case Read => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.KEEP - case _ => sinkMode - }).asInstanceOf[Tap[_,_,_]] - } - case Write => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.UPDATE - case _ => sinkMode - }).asInstanceOf[Tap[_,_,_]] - } - } - case _ => super.createTap(readOrWrite)(mode) - } - } + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } } -- cgit v1.2.3 From af97d309d95657a43029d9eda427b072c442672d Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:39:15 +0300 Subject: formatting --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 65a03e5..823ca22 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,7 @@ HBaseRawSource is an alternative HBase source implementation that provides two m **Passing a scan object** HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source. e.g. + val scan = new Scan val key = "my_key_prefix" scan.setStartRow(Bytes.toBytes(key)) @@ -249,6 +250,7 @@ The first field is the row key, the second is the row Result object. You can the The sink will write the output fields as columns under the provided family and field name as the column name. You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. e.g. + val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE) hbaseSource.read .mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { -- cgit v1.2.3 From 03b4ca81507d6d0f21db00c55ba6410d5cc26193 Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:39:52 +0300 Subject: formatting --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 823ca22..556efd2 100644 --- a/README.md +++ b/README.md @@ -262,5 +262,5 @@ e.g. (rowkey, col1Times2, col2) } } - .write(out) + .write(hbaseOut) -- cgit v1.2.3 From 688265bc93ab91413a04c9d5772ec639c2c030a3 Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:42:38 +0300 Subject: formatting --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 556efd2..6938e48 100644 --- a/README.md +++ b/README.md @@ -230,8 +230,8 @@ Add the trait to the job class and start using the conversions in the pipe direc 5. Raw HBase Tap and Source =========================== HBaseRawSource is an alternative HBase source implementation that provides two main features: -1. Ability to provide a custom scan object. -2. Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns). +* Ability to provide a custom scan object. +* Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns). **Passing a scan object** HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source. -- cgit v1.2.3 From e5f8ad62804daa1a08d5fd22b79d73018fd7bea8 Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:43:27 +0300 Subject: formatting --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 6938e48..e6d9af8 100644 --- a/README.md +++ b/README.md @@ -234,6 +234,7 @@ HBaseRawSource is an alternative HBase source implementation that provides two m * Passing the row object to the mapper to allow full customized processing (without the need to declare in advance the read columns). **Passing a scan object** + HBaseRawSource object provides a helper function to encode a scan object as a base64 string, which you can pass to the source. e.g. @@ -245,6 +246,7 @@ e.g. val hbaseSource = new HBaseRawSource("MY-TABLE", "hbase-local", Array("col-family"), base64Scan = scanner) **Processing the rows** + The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). The first field is the row key, the second is the row Result object. You can then process the row as needed. The sink will write the output fields as columns under the provided family and field name as the column name. -- cgit v1.2.3 From d41bfefa12517df9c814f1955c20a0f65eaf863f Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:45:19 +0300 Subject: formatting --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e6d9af8..9674d90 100644 --- a/README.md +++ b/README.md @@ -247,10 +247,10 @@ e.g. **Processing the rows** -The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). -The first field is the row key, the second is the row Result object. You can then process the row as needed. -The sink will write the output fields as columns under the provided family and field name as the column name. -You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. +The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). +The first field is the row key, the second is the row Result object. You can then process the row as needed. +The sink will write the output fields as columns under the provided family and field name as the column name. +You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. e.g. val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE) -- cgit v1.2.3 From 50895c2852b559816173416da553468904ae6353 Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:48:07 +0300 Subject: some more notes --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9674d90..a0ca5b9 100644 --- a/README.md +++ b/README.md @@ -249,7 +249,8 @@ e.g. The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). The first field is the row key, the second is the row Result object. You can then process the row as needed. -The sink will write the output fields as columns under the provided family and field name as the column name. +The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source)). +It will then write the output fields (except the rowkey) as columns under the provided family and field name as the column name. You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. e.g. -- cgit v1.2.3 From 51e5670113b23a6aac0edd4c7e324faf2734e01b Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:48:29 +0300 Subject: formatting --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a0ca5b9..2cc40d4 100644 --- a/README.md +++ b/README.md @@ -249,7 +249,7 @@ e.g. The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). The first field is the row key, the second is the row Result object. You can then process the row as needed. -The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source)). +The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source). It will then write the output fields (except the rowkey) as columns under the provided family and field name as the column name. You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. e.g. -- cgit v1.2.3 From 7566bbcc71e157444185432f2b359b6e3b857dfe Mon Sep 17 00:00:00 2001 From: rotem Date: Mon, 17 Jun 2013 16:49:16 +0300 Subject: formatting --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2cc40d4..ee5abb9 100644 --- a/README.md +++ b/README.md @@ -250,7 +250,7 @@ e.g. The mapper function gets from HBaseRawSource a tuple containing two fields: (rowkey, row). The first field is the row key, the second is the row Result object. You can then process the row as needed. The sink expects a rowkey field in the tuple it gets to use as a row key (it doesn't have to be the same as the one emitted by the source). -It will then write the output fields (except the rowkey) as columns under the provided family and field name as the column name. +It will then write the output fields (except the rowkey) as columns under the provided family, using the field name as the column name. You can also provide the field name as a full qualifier (family:column) to specify a different family than was declared in the source. e.g. -- cgit v1.2.3