diff options
Diffstat (limited to 'src/main/scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 166 |
1 files changed, 83 insertions, 83 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 6216695..450a57d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -//package parallelai.spyglass.hbase -// -//import cascading.pipe.Pipe -//import cascading.pipe.assembly.Coerce -//import cascading.scheme.Scheme -//import cascading.tap.{ Tap, SinkMode } -//import cascading.tuple.Fields -//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -//import org.apache.hadoop.hbase.util.Bytes -//import scala.collection.JavaConversions._ -//import scala.collection.mutable.WrappedArray -//import com.twitter.scalding._ -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable -//import org.apache.hadoop.hbase.client.Scan -//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -//import org.apache.hadoop.hbase.util.Base64 -//import java.io.ByteArrayOutputStream -//import java.io.DataOutputStream -// -//object HBaseRawSource { -// /** -// * Converts a scan object to a base64 string that can be passed to HBaseRawSource -// * @param scan -// * @return base64 string representation -// */ -// def convertScanToString(scan: Scan) = { -// val out = new ByteArrayOutputStream(); -// val dos = new DataOutputStream(out); -// scan.write(dos); -// Base64.encodeBytes(out.toByteArray()); -// } -//} -// -// -///** -// * @author Rotem Hermon -// * -// * HBaseRawSource is a scalding source that passes the original row (Result) object to the -// * mapper for customized processing. -// * -// * @param tableName The name of the HBase table to read -// * @param quorumNames HBase quorum -// * @param familyNames Column families to get (source, if null will get all) or update to (sink) -// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written -// * @param base64Scan An optional base64 encoded scan object -// * @param sinkMode If REPLACE the output table will be deleted before writing to -// * -// */ -//class HBaseRawSource( -// tableName: String, -// quorumNames: String = "localhost", -// familyNames: Array[String], -// writeNulls: Boolean = true, -// base64Scan: String = null, -// sinkMode: SinkMode = null) extends Source { -// -// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -// -// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -// val hBaseScheme = hdfsScheme match { -// case hbase: HBaseRawScheme => hbase -// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -// } -// mode match { -// case hdfsMode @ Hdfs(_, _) => readOrWrite match { -// case Read => { -// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -// case null => SinkMode.KEEP -// case _ => sinkMode -// }).asInstanceOf[Tap[_, _, _]] -// } -// case Write => { -// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -// case null => SinkMode.UPDATE -// case _ => sinkMode -// }).asInstanceOf[Tap[_, _, _]] -// } -// } -// case _ => super.createTap(readOrWrite)(mode) -// } -// } -//} +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { + /** + * Converts a scan object to a base64 string that can be passed to HBaseRawSource + * @param scan + * @return base64 string representation + */ + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } +} + + +/** +* @author Rotem Hermon +* +* HBaseRawSource is a scalding source that passes the original row (Result) object to the +* mapper for customized processing. +* +* @param tableName The name of the HBase table to read +* @param quorumNames HBase quorum +* @param familyNames Column families to get (source, if null will get all) or update to (sink) +* @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written +* @param base64Scan An optional base64 encoded scan object +* @param sinkMode If REPLACE the output table will be deleted before writing to +* +*/ +class HBaseRawSource( + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan: String = null, + sinkMode: SinkMode = null) extends Source { + + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} |