diff options
author | rotem <rotem.hermon@gmail.com> | 2013-06-16 12:09:42 +0300 |
---|---|---|
committer | rotem <rotem.hermon@gmail.com> | 2013-06-16 12:09:42 +0300 |
commit | b72c234dd35c3eb807e8050385adf697dcf97fad (patch) | |
tree | 6a2372996840544af156bac37f837448deaa8792 /src/main/scala/parallelai/spyglass/hbase | |
parent | 388714f40b45e19985ab2ac68a23e96d71d857ad (diff) | |
download | SpyGlass-b72c234dd35c3eb807e8050385adf697dcf97fad.tar.gz SpyGlass-b72c234dd35c3eb807e8050385adf697dcf97fad.zip |
Adding HBase raw taps and source, bumping needed dependency versions, bumping scala version
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala new file mode 100644 index 0000000..1fc6f7d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -0,0 +1,62 @@ +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } +} +class HBaseRawSource( + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan:String = null, + sinkMode: SinkMode = null) extends Source { + + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_,_,_]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_,_,_]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} |