aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
blob: 603568800dde36f7779bffb513075d7d3895b082 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package parallelai.spyglass.hbase

import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import com.twitter.scalding.Dsl._
import cascading.pipe.Pipe
import cascading.tuple.Fields
import com.twitter.scalding.RichPipe
import com.twitter.scalding.RichFields
import org.apache.hadoop.hbase.util.Bytes
import cascading.tuple.TupleEntry

class HBasePipeWrapper (pipe: Pipe) {
   def toBytesWritable(f: Fields): Pipe = {
	  asList(f)
     .foldLeft(pipe){ (p, f) => {
	    p.map(f.toString -> f.toString){ from: String => {
	      new ImmutableBytesWritable(Bytes.toBytes(
	          if (from == null) "" else from))
	    }}
	  }} 
	}

//   def toBytesWritable : Pipe = {
//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
//	    p.map(f.toString -> f.toString){ from: String => {
//	      new ImmutableBytesWritable(Bytes.toBytes(from))
//	    }}
//	  }} 
//	}

	def fromBytesWritable(f: Fields): Pipe = {
	  asList(f)
	  .foldLeft(pipe) { (p, fld) =>
	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
	    	Bytes.toString(from.get)
	      }
	    }
	  }
	}	
	
//	def fromBytesWritable : Pipe = {
//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
//	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
//	    	Bytes.toString(from.get)
//	      }
//	    }
//	  }
//	}
}

trait HBasePipeConversions {
  implicit def pipeWrapper(pipe: Pipe) = new HBasePipeWrapper(pipe) 
}