aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
blob: 2ca3f32e6a63960b79f5b0b764cc92beae3a5058 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package parallelai.spyglass.hbase.testing

import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;

import com.twitter.scalding.Args
import parallelai.spyglass.hbase.HBaseSource
import com.twitter.scalding.Tsv
import cascading.tuple.Fields
import com.twitter.scalding.TextLine
import org.apache.log4j.Logger
import org.apache.log4j.Level
import parallelai.spyglass.hbase.HBasePipeConversions
import cascading.pipe.Pipe

class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {

  val isDebug = args.getOrElse("debug", "false").toBoolean
  
  if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) } 
  
  val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
  
  val prefix = "0123456789"
    
  val quorum = args("quorum")
  
  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.SCAN_ALL ).read
          .fromBytesWritable( TABLE_SCHEMA )
  .write(TextLine("saltTesting/ScanAllNoSalt01"))

  val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read       
        .fromBytesWritable( TABLE_SCHEMA )
  .write(TextLine("saltTesting/ScanAllPlusSalt01"))

  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read     
        .fromBytesWritable(TABLE_SCHEMA )
  .write(TextLine("saltTesting/ScanRangeNoSalt01"))

  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read       
        .fromBytesWritable(TABLE_SCHEMA )
  .write(TextLine("saltTesting/ScanRangePlusSalt01"))

  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read
          
          .fromBytesWritable(TABLE_SCHEMA )
  .write(TextLine("saltTesting/GetListNoSalt01"))

  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read
          
          .fromBytesWritable(TABLE_SCHEMA )
  .write(TextLine("saltTesting/GetListPlusSalt01"))

  val hbase07 = 
      new HBaseSource( "_TEST.SALT.03", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix )
  .read
  .fromBytesWritable( TABLE_SCHEMA )
  .write(TextLine("saltTesting/ScanRangePlusSalt10"))
  .toBytesWritable( TABLE_SCHEMA )
  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,  
          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
          useSalt = true ))

//  val hbase08 = 
//      new HBaseSource( "_TEST.SALT.01", quorum, 'key,  
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, 
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
//          sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix )
//  .read
//  .fromBytesWritable('*)
//  .write(TextLine("saltTesting/ScanRangePlusSalt03"))

}