blob: 31ed3ea064443615313d58be6a14f2114b791cc6 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package parallelai.spyglass.hbase
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import com.twitter.scalding.Dsl._
import cascading.pipe.Pipe
import cascading.tuple.Fields
import com.twitter.scalding.RichPipe
import com.twitter.scalding.RichFields
import org.apache.hadoop.hbase.util.Bytes
import cascading.tuple.TupleEntry
class HBasePipeWrapper (pipe: Pipe) {
def toBytesWritable(f: Fields): Pipe = {
asList(f)
.foldLeft(pipe){ (p, f) => {
p.map(f.toString -> f.toString){ from: String =>
Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
}}
}
}
// def toBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
// p.map(f.toString -> f.toString){ from: String => {
// new ImmutableBytesWritable(Bytes.toBytes(from))
// }}
// }}
// }
def fromBytesWritable(f: Fields): Pipe = {
asList(f)
.foldLeft(pipe) { (p, fld) => {
p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable =>
Option(from).map(x => Bytes.toString(x.get)).getOrElse(null)
}
}}
}
// def fromBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
// Bytes.toString(from.get)
// }
// }
// }
// }
}
trait HBasePipeConversions {
implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe)
}
|