aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
blob: a4e2d7af97831e63ae1782b53782890043f9b3fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package parallelai.spyglass.hbase.testing

import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode

import com.twitter.scalding.{IterableSource, Args, TextLine}
import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
import cascading.tuple.Fields
import org.apache.log4j.{Logger, Level}
import cascading.tap.SinkMode
import cascading.pipe.Pipe
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes

class HBaseSaltTestSetup (args: Args) extends JobBase(args) with HBasePipeConversions {

  val isDebug = args.getOrElse("debug", "false").toBoolean

  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }

  val TABLE_SCHEMA = List('key, 'salted, 'unsalted)

  val prefix = "0123456789"

  val quorum = args("quorum")

  val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))

  def toIBW(pipe: Pipe, f: Fields): Pipe = {
    asList(f)
      .foldLeft(pipe){ (p, f) => {
      p.map(f.toString -> f.toString){ from: String =>
        Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
      }}
    }
  }


  val input = IterableSource(inVals, TABLE_SCHEMA)
    .read
    .write(TextLine("saltTesting/Inputs"))

  val maker = toIBW(IterableSource(inVals, TABLE_SCHEMA).read, TABLE_SCHEMA)
    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
    TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.UPDATE ))
}

class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {

  val isDebug = args.getOrElse("debug", "false").toBoolean

  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }

  val TABLE_SCHEMA = List('key, 'salted, 'unsalted)

  val prefix = "0123456789"

  val quorum = args("quorum")

  val sttKey = "01728"
  val stpKey = "01831"
  val sttKeyP = "8_01728"
  val stpKeyP = "1_01831"
  val listKey = List("01681", "01456")
  val listKeyP = List("1_01681", "6_01456")
  
//  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.SCAN_ALL ).read
//          .fromBytesWritable( TABLE_SCHEMA )
//  .write(TextLine("saltTesting/ScanAllNoSalt01"))

  val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read
//        .fromBytesWritable( TABLE_SCHEMA )
  .write(TextLine("saltTesting/ScanAllPlusSalt01"))

//  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read
//        .fromBytesWritable(TABLE_SCHEMA )
//  .write(TextLine("saltTesting/ScanRangeNoSalt01"))
//
//  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read
//        .fromBytesWritable(TABLE_SCHEMA )
//  .write(TextLine("saltTesting/ScanRangePlusSalt01"))
//
//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read
//          .fromBytesWritable(TABLE_SCHEMA )
//  .write(TextLine("saltTesting/GetListNoSalt01"))
//
//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read
//          .fromBytesWritable(TABLE_SCHEMA )
//  .write(TextLine("saltTesting/GetListPlusSalt01"))
//
//  val hbase07 =
//      new HBaseSource( "_TEST.SALT.03", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )
//  .read
//  .fromBytesWritable( TABLE_SCHEMA )
//  .write(TextLine("saltTesting/ScanRangePlusSalt10"))
//  .toBytesWritable( TABLE_SCHEMA )
//  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          useSalt = true ))
//
//  val hbase08 =
//      new HBaseSource( "_TEST.SALT.01", quorum, 'key,
//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix )
//  .read
//  .fromBytesWritable('*)
//  .write(TextLine("saltTesting/ScanRangePlusSalt03"))
}

class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions {

  val isDebug = args.getOrElse("debug", "false").toBoolean

  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }

  val TABLE_SCHEMA = List('key, 'salted, 'unsalted)

  val prefix = "0123456789"

  val quorum = args("quorum")

  val inVals = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x)))

  def toIBW(pipe: Pipe, f: Fields): Pipe = {
    asList(f)
      .foldLeft(pipe){ (p, f) => {
      p.map(f.toString -> f.toString){ from: String =>
        Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
      }}
    }
  }

  val input = IterableSource(inVals, TABLE_SCHEMA).read

  val eraser = toIBW(input, TABLE_SCHEMA)
    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
      TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
      TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
}