package parallelai.spyglass.hbase import cascading.pipe.Pipe import cascading.pipe.assembly.Coerce import cascading.scheme.Scheme import cascading.tap.{ Tap, SinkMode } import cascading.tuple.Fields import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConversions._ import scala.collection.mutable.WrappedArray import com.twitter.scalding._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.hbase.util.Base64 import java.io.ByteArrayOutputStream import java.io.DataOutputStream object HBaseRawSource { /** * Converts a scan object to a base64 string that can be passed to HBaseRawSource * @param scan * @return base64 string representation */ def convertScanToString(scan: Scan) = { val out = new ByteArrayOutputStream(); val dos = new DataOutputStream(out); scan.write(dos); Base64.encodeBytes(out.toByteArray()); } } /** * @author Rotem Hermon * * HBaseRawSource is a scalding source that passes the original row (Result) object to the * mapper for customized processing. * * @param tableName The name of the HBase table to read * @param quorumNames HBase quorum * @param familyNames Column families to get (source, if null will get all) or update to (sink) * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written * @param base64Scan An optional base64 encoded scan object * @param sinkMode If REPLACE the output table will be deleted before writing to * */ class HBaseRawSource( tableName: String, quorumNames: String = "localhost", familyNames: Array[String], writeNulls: Boolean = true, base64Scan: String = null, sinkMode: SinkMode = null) extends Source { override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val hBaseScheme = hdfsScheme match { case hbase: HBaseRawScheme => hbase case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") } mode match { case hdfsMode @ Hdfs(_, _) => readOrWrite match { case Read => { new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { case null => SinkMode.KEEP case _ => sinkMode }).asInstanceOf[Tap[_, _, _]] } case Write => { new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { case null => SinkMode.UPDATE case _ => sinkMode }).asInstanceOf[Tap[_, _, _]] } } case _ => super.createTap(readOrWrite)(mode) } } }