aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
diff options
context:
space:
mode:
authorGracia Fernandez <Gracia.FernandezLopez@bskyb.com>2013-07-10 10:26:41 +0100
committerGracia Fernandez <Gracia.FernandezLopez@bskyb.com>2013-07-10 10:26:41 +0100
commitdf2fa37f337bbbb219449aadaf57bcacd2350ada (patch)
tree12133be82c0cc80af58dc06eda43fc671725c9ee /src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
parent20a18b4388f0cd06bec0b43d083150f6e1bb2c5e (diff)
downloadSpyGlass-df2fa37f337bbbb219449aadaf57bcacd2350ada.tar.gz
SpyGlass-df2fa37f337bbbb219449aadaf57bcacd2350ada.zip
Versions reverted back to the old ones: Scala 2.9.3 (cdh4.2.0)
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala166
1 files changed, 83 insertions, 83 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
index bbc205b..6216695 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
@@ -1,83 +1,83 @@
-package parallelai.spyglass.hbase
-
-import cascading.pipe.Pipe
-import cascading.pipe.assembly.Coerce
-import cascading.scheme.Scheme
-import cascading.tap.{ Tap, SinkMode }
-import cascading.tuple.Fields
-import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
-import org.apache.hadoop.hbase.util.Bytes
-import scala.collection.JavaConversions._
-import scala.collection.mutable.WrappedArray
-import com.twitter.scalding._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
-import org.apache.hadoop.hbase.util.Base64
-import java.io.ByteArrayOutputStream
-import java.io.DataOutputStream
-
-object HBaseRawSource {
- /**
- * Converts a scan object to a base64 string that can be passed to HBaseRawSource
- * @param scan
- * @return base64 string representation
- */
- def convertScanToString(scan: Scan) = {
- val out = new ByteArrayOutputStream();
- val dos = new DataOutputStream(out);
- scan.write(dos);
- Base64.encodeBytes(out.toByteArray());
- }
-}
-
-
-/**
- * @author Rotem Hermon
- *
- * HBaseRawSource is a scalding source that passes the original row (Result) object to the
- * mapper for customized processing.
- *
- * @param tableName The name of the HBase table to read
- * @param quorumNames HBase quorum
- * @param familyNames Column families to get (source, if null will get all) or update to (sink)
- * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
- * @param base64Scan An optional base64 encoded scan object
- * @param sinkMode If REPLACE the output table will be deleted before writing to
- *
- */
-class HBaseRawSource(
- tableName: String,
- quorumNames: String = "localhost",
- familyNames: Array[String],
- writeNulls: Boolean = true,
- base64Scan: String = null,
- sinkMode: SinkMode = null) extends Source {
-
- override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
- .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
-
- override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
- val hBaseScheme = hdfsScheme match {
- case hbase: HBaseRawScheme => hbase
- case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
- }
- mode match {
- case hdfsMode @ Hdfs(_, _) => readOrWrite match {
- case Read => {
- new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
- case null => SinkMode.KEEP
- case _ => sinkMode
- }).asInstanceOf[Tap[_, _, _]]
- }
- case Write => {
- new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
- case null => SinkMode.UPDATE
- case _ => sinkMode
- }).asInstanceOf[Tap[_, _, _]]
- }
- }
- case _ => super.createTap(readOrWrite)(mode)
- }
- }
-}
+//package parallelai.spyglass.hbase
+//
+//import cascading.pipe.Pipe
+//import cascading.pipe.assembly.Coerce
+//import cascading.scheme.Scheme
+//import cascading.tap.{ Tap, SinkMode }
+//import cascading.tuple.Fields
+//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
+//import org.apache.hadoop.hbase.util.Bytes
+//import scala.collection.JavaConversions._
+//import scala.collection.mutable.WrappedArray
+//import com.twitter.scalding._
+//import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+//import org.apache.hadoop.hbase.client.Scan
+//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+//import org.apache.hadoop.hbase.util.Base64
+//import java.io.ByteArrayOutputStream
+//import java.io.DataOutputStream
+//
+//object HBaseRawSource {
+// /**
+// * Converts a scan object to a base64 string that can be passed to HBaseRawSource
+// * @param scan
+// * @return base64 string representation
+// */
+// def convertScanToString(scan: Scan) = {
+// val out = new ByteArrayOutputStream();
+// val dos = new DataOutputStream(out);
+// scan.write(dos);
+// Base64.encodeBytes(out.toByteArray());
+// }
+//}
+//
+//
+///**
+// * @author Rotem Hermon
+// *
+// * HBaseRawSource is a scalding source that passes the original row (Result) object to the
+// * mapper for customized processing.
+// *
+// * @param tableName The name of the HBase table to read
+// * @param quorumNames HBase quorum
+// * @param familyNames Column families to get (source, if null will get all) or update to (sink)
+// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
+// * @param base64Scan An optional base64 encoded scan object
+// * @param sinkMode If REPLACE the output table will be deleted before writing to
+// *
+// */
+//class HBaseRawSource(
+// tableName: String,
+// quorumNames: String = "localhost",
+// familyNames: Array[String],
+// writeNulls: Boolean = true,
+// base64Scan: String = null,
+// sinkMode: SinkMode = null) extends Source {
+//
+// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
+// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+//
+// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+// val hBaseScheme = hdfsScheme match {
+// case hbase: HBaseRawScheme => hbase
+// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
+// }
+// mode match {
+// case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+// case Read => {
+// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+// case null => SinkMode.KEEP
+// case _ => sinkMode
+// }).asInstanceOf[Tap[_, _, _]]
+// }
+// case Write => {
+// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+// case null => SinkMode.UPDATE
+// case _ => sinkMode
+// }).asInstanceOf[Tap[_, _, _]]
+// }
+// }
+// case _ => super.createTap(readOrWrite)(mode)
+// }
+// }
+//}