diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 166 |
1 files changed, 83 insertions, 83 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index bbc205b..6216695 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -package parallelai.spyglass.hbase - -import cascading.pipe.Pipe -import cascading.pipe.assembly.Coerce -import cascading.scheme.Scheme -import cascading.tap.{ Tap, SinkMode } -import cascading.tuple.Fields -import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -import org.apache.hadoop.hbase.util.Bytes -import scala.collection.JavaConversions._ -import scala.collection.mutable.WrappedArray -import com.twitter.scalding._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -import org.apache.hadoop.hbase.util.Base64 -import java.io.ByteArrayOutputStream -import java.io.DataOutputStream - -object HBaseRawSource { - /** - * Converts a scan object to a base64 string that can be passed to HBaseRawSource - * @param scan - * @return base64 string representation - */ - def convertScanToString(scan: Scan) = { - val out = new ByteArrayOutputStream(); - val dos = new DataOutputStream(out); - scan.write(dos); - Base64.encodeBytes(out.toByteArray()); - } -} - - -/** - * @author Rotem Hermon - * - * HBaseRawSource is a scalding source that passes the original row (Result) object to the - * mapper for customized processing. - * - * @param tableName The name of the HBase table to read - * @param quorumNames HBase quorum - * @param familyNames Column families to get (source, if null will get all) or update to (sink) - * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written - * @param base64Scan An optional base64 encoded scan object - * @param sinkMode If REPLACE the output table will be deleted before writing to - * - */ -class HBaseRawSource( - tableName: String, - quorumNames: String = "localhost", - familyNames: Array[String], - writeNulls: Boolean = true, - base64Scan: String = null, - sinkMode: SinkMode = null) extends Source { - - override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] - - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - val hBaseScheme = hdfsScheme match { - case hbase: HBaseRawScheme => hbase - case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") - } - mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { - case Read => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.KEEP - case _ => sinkMode - }).asInstanceOf[Tap[_, _, _]] - } - case Write => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.UPDATE - case _ => sinkMode - }).asInstanceOf[Tap[_, _, _]] - } - } - case _ => super.createTap(readOrWrite)(mode) - } - } -} +//package parallelai.spyglass.hbase +// +//import cascading.pipe.Pipe +//import cascading.pipe.assembly.Coerce +//import cascading.scheme.Scheme +//import cascading.tap.{ Tap, SinkMode } +//import cascading.tuple.Fields +//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +//import org.apache.hadoop.hbase.util.Bytes +//import scala.collection.JavaConversions._ +//import scala.collection.mutable.WrappedArray +//import com.twitter.scalding._ +//import org.apache.hadoop.hbase.io.ImmutableBytesWritable +//import org.apache.hadoop.hbase.client.Scan +//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +//import org.apache.hadoop.hbase.util.Base64 +//import java.io.ByteArrayOutputStream +//import java.io.DataOutputStream +// +//object HBaseRawSource { +// /** +// * Converts a scan object to a base64 string that can be passed to HBaseRawSource +// * @param scan +// * @return base64 string representation +// */ +// def convertScanToString(scan: Scan) = { +// val out = new ByteArrayOutputStream(); +// val dos = new DataOutputStream(out); +// scan.write(dos); +// Base64.encodeBytes(out.toByteArray()); +// } +//} +// +// +///** +// * @author Rotem Hermon +// * +// * HBaseRawSource is a scalding source that passes the original row (Result) object to the +// * mapper for customized processing. +// * +// * @param tableName The name of the HBase table to read +// * @param quorumNames HBase quorum +// * @param familyNames Column families to get (source, if null will get all) or update to (sink) +// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written +// * @param base64Scan An optional base64 encoded scan object +// * @param sinkMode If REPLACE the output table will be deleted before writing to +// * +// */ +//class HBaseRawSource( +// tableName: String, +// quorumNames: String = "localhost", +// familyNames: Array[String], +// writeNulls: Boolean = true, +// base64Scan: String = null, +// sinkMode: SinkMode = null) extends Source { +// +// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +// +// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +// val hBaseScheme = hdfsScheme match { +// case hbase: HBaseRawScheme => hbase +// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +// } +// mode match { +// case hdfsMode @ Hdfs(_, _) => readOrWrite match { +// case Read => { +// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +// case null => SinkMode.KEEP +// case _ => sinkMode +// }).asInstanceOf[Tap[_, _, _]] +// } +// case Write => { +// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +// case null => SinkMode.UPDATE +// case _ => sinkMode +// }).asInstanceOf[Tap[_, _, _]] +// } +// } +// case _ => super.createTap(readOrWrite)(mode) +// } +// } +//} |