diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala new file mode 100644 index 0000000..21d90e8 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -0,0 +1,54 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import com.twitter.scalding.Dsl._ +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding.RichPipe +import com.twitter.scalding.RichFields +import org.apache.hadoop.hbase.util.Bytes +import cascading.tuple.TupleEntry + +class HBasePipeWrapper (pipe: Pipe) { + def toBytesWritable(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => { + new ImmutableBytesWritable(Bytes.toBytes(from)) + }} + }} + } + +// def toBytesWritable : Pipe = { +// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { +// p.map(f.toString -> f.toString){ from: String => { +// new ImmutableBytesWritable(Bytes.toBytes(from)) +// }} +// }} +// } + + def fromBytesWritable(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe) { (p, fld) => + p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { + Bytes.toString(from.get) + } + } + } + } + +// def fromBytesWritable : Pipe = { +// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => +// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { +// Bytes.toString(from.get) +// } +// } +// } +// } +} + +trait HBasePipeConversions { + implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe) +} + + |