aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
diff options
context:
space:
mode:
authorChandan Rajah <chandan.rajah@gmail.com>2013-06-06 12:27:15 +0100
committerChandan Rajah <chandan.rajah@gmail.com>2013-06-06 12:27:15 +0100
commit6e21e0c68248a33875898b86a2be7a9cec7df3d4 (patch)
tree5254682e3c3440f7c6954b23519459107b8a445e /src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
parentea9c80374da846edf2a1634a42ccb932838ebd5b (diff)
downloadSpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.tar.gz
SpyGlass-6e21e0c68248a33875898b86a2be7a9cec7df3d4.zip
Added extensions to Read and Write mode.
Added support for key prefixes
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala54
1 files changed, 54 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
new file mode 100644
index 0000000..21d90e8
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -0,0 +1,54 @@
+package parallelai.spyglass.hbase
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import com.twitter.scalding.Dsl._
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding.RichPipe
+import com.twitter.scalding.RichFields
+import org.apache.hadoop.hbase.util.Bytes
+import cascading.tuple.TupleEntry
+
+class HBasePipeWrapper (pipe: Pipe) {
+ def toBytesWritable(f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String => {
+ new ImmutableBytesWritable(Bytes.toBytes(from))
+ }}
+ }}
+ }
+
+// def toBytesWritable : Pipe = {
+// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
+// p.map(f.toString -> f.toString){ from: String => {
+// new ImmutableBytesWritable(Bytes.toBytes(from))
+// }}
+// }}
+// }
+
+ def fromBytesWritable(f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe) { (p, fld) =>
+ p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
+ Bytes.toString(from.get)
+ }
+ }
+ }
+ }
+
+// def fromBytesWritable : Pipe = {
+// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
+// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
+// Bytes.toString(from.get)
+// }
+// }
+// }
+// }
+}
+
+trait HBasePipeConversions {
+ implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe)
+}
+
+