aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
blob: 09ad19d09da00607a92ee4cd8dc5177e0d86b3ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package parallelai.spyglass.hbase

import java.io.IOException
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import com.twitter.scalding.AccessMode
import com.twitter.scalding.Hdfs
import com.twitter.scalding.Mode
import com.twitter.scalding.Read
import com.twitter.scalding.Write
import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Source
import com.twitter.scalding.TestMode
import com.twitter.scalding.Test

object Conversions {
  implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
  implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong
  implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get())
  implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))
}

case class HBaseSource(
    tableName: String = null,
    quorumNames: String = "localhost",
    keyFields: Fields = null,
    familyNames: List[String] = null,
    valueFields: List[Fields] = null,
    timestamp: Long = 0L,
    sourceMode: SourceMode = SourceMode.SCAN_ALL,
    startKey: String = null,
    stopKey: String = null,
    keyList: List[String] = null,
    versions: Int = 1,
    useSalt: Boolean = false,
    prefixList: String = null,
    sinkMode: SinkMode = SinkMode.UPDATE,
    inputSplitType: SplitType = SplitType.GRANULAR
  ) extends Source {

  val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
  internalScheme.setInputSplitTye(inputSplitType)

  val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]

  // To enable local mode testing
  val allFields = keyFields.append(valueFields.toArray)
  def localScheme = new NullScheme(allFields, allFields)

  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
    val hBaseScheme = hdfsScheme match {
      case hbase: HBaseScheme => hbase
      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")
    } 
    mode match { 
      case hdfsMode @ Hdfs(_, _) => readOrWrite match {
        case Read => { 
          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
           
          sourceMode match {
            case SourceMode.SCAN_RANGE => {
              
              hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
            }
            case SourceMode.SCAN_ALL => {
              hbt.setHBaseScanAllParms(useSalt, prefixList)
            }
            case SourceMode.GET_LIST => {
              if( keyList == null )  
                throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
              
              hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
            }
            case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
          }

          hbt.setInputSplitType(inputSplitType)
          
          hbt.asInstanceOf[Tap[_,_,_]]
        }
        case Write => {
          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
          
          hbt.setUseSaltInSink(useSalt)
          
          hbt.asInstanceOf[Tap[_,_,_]]
        }
      }
      case testMode @ Test(_) => readOrWrite match {
        case Read => { 
          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
           
          sourceMode match {
            case SourceMode.SCAN_RANGE => {
              
              hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
            }
            case SourceMode.SCAN_ALL => {
              hbt.setHBaseScanAllParms(useSalt, prefixList)
            }
            case SourceMode.GET_LIST => {
              if( keyList == null )  
                throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
              
              hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
            }
            case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
          }

          hbt.setInputSplitType(inputSplitType)
          
          hbt.asInstanceOf[Tap[_,_,_]]
        }
        case Write => {
          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
          
          hbt.setUseSaltInSink(useSalt)
          
          hbt.asInstanceOf[Tap[_,_,_]]
        }
      }      
      case _ => createEmptyTap(readOrWrite)(mode)
    }
  }
  
  
  
  def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
    mode match {
      case _ => {
        throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
      }
    }
  }  
}