diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 1fc6f7d..bbc205b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -18,45 +18,66 @@ import java.io.ByteArrayOutputStream import java.io.DataOutputStream object HBaseRawSource { - def convertScanToString(scan: Scan) = { - val out = new ByteArrayOutputStream(); - val dos = new DataOutputStream(out); - scan.write(dos); - Base64.encodeBytes(out.toByteArray()); - } + /** + * Converts a scan object to a base64 string that can be passed to HBaseRawSource + * @param scan + * @return base64 string representation + */ + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } } + + +/** + * @author Rotem Hermon + * + * HBaseRawSource is a scalding source that passes the original row (Result) object to the + * mapper for customized processing. + * + * @param tableName The name of the HBase table to read + * @param quorumNames HBase quorum + * @param familyNames Column families to get (source, if null will get all) or update to (sink) + * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to + * + */ class HBaseRawSource( - tableName: String, - quorumNames: String = "localhost", - familyNames: Array[String], - writeNulls: Boolean = true, - base64Scan:String = null, - sinkMode: SinkMode = null) extends Source { + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan: String = null, + sinkMode: SinkMode = null) extends Source { - override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - val hBaseScheme = hdfsScheme match { - case hbase: HBaseRawScheme => hbase - case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") - } - mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { - case Read => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.KEEP - case _ => sinkMode - }).asInstanceOf[Tap[_,_,_]] - } - case Write => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.UPDATE - case _ => sinkMode - }).asInstanceOf[Tap[_,_,_]] - } - } - case _ => super.createTap(readOrWrite)(mode) - } - } + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } } |