aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
blob: 30c28c9ce19708e34654c436500ffde01e5e0a31 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package parallelai.spyglass.hbase

import java.io.IOException
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import com.twitter.scalding._
import parallelai.spyglass.hbase.HBaseConstants.SplitType
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import java.io.InputStream
import java.io.OutputStream
import java.util.Properties
import com.twitter.scalding.Hdfs
import com.twitter.scalding.Test

object Conversions {
  implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
  implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong
  implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get())
  implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))
}

case class HBaseSource(
    tableName: String = null,
    quorumNames: String = "localhost",
    keyFields: Fields = null,
    familyNames: List[String] = null,
    valueFields: List[Fields] = null,
    timestamp: Long = 0L,
    sourceMode: SourceMode = SourceMode.SCAN_ALL,
    startKey: String = null,
    stopKey: String = null,
    keyList: List[String] = null,
    versions: Int = 1,
    useSalt: Boolean = false,
    prefixList: String = null,
    sinkMode: SinkMode = SinkMode.UPDATE,
    inputSplitType: SplitType = SplitType.GRANULAR
  ) extends Source {
  
  val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
  internalScheme.setInputSplitTye(inputSplitType)

  val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]

  // To enable local mode testing
  val allFields = keyFields.append(valueFields.toArray)
  type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
  def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields)  

  
  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
    val hBaseScheme = hdfsScheme match {
      case hbase: HBaseScheme => hbase
      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")
    } 
    mode match { 
      case Hdfs(_, _) => readOrWrite match {
        case Read => { 
          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
           
          sourceMode match {
            case SourceMode.SCAN_RANGE => {
              
              hbt.setHBaseRangeParms(startKey, stopKey, useSalt, prefixList)
            }
            case SourceMode.SCAN_ALL => {
              hbt.setHBaseScanAllParms(useSalt, prefixList)
            }
            case SourceMode.GET_LIST => {
              if( keyList == null )  
                throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
              
              hbt.setHBaseListParms(keyList.toArray[String], versions, useSalt, prefixList)
            }
            case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
          }

          hbt.setInputSplitType(inputSplitType)
          
          hbt.asInstanceOf[Tap[_,_,_]]
        }
        case Write => {
          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
          
          hbt.setUseSaltInSink(useSalt)

          hbt.asInstanceOf[Tap[_,_,_]]
        }
      }
      /**case Test(buffers) => {
        /*
        * There MUST have already been a registered sink or source in the Test mode.
        * to access this.  You must explicitly name each of your test sources in your
        * JobTest.
        */
        val buffer =
          if (readOrWrite == Write) {
            val buf = buffers(this)
            //Make sure we wipe it out:
            buf.clear()
            buf
          } else {
            // if the source is also used as a sink, we don't want its contents to get modified
            buffers(this).clone()
          }
        // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
        new MemoryTap[InputStream, OutputStream](localScheme, buffer)
      }*/      
      case Test(buffer) => readOrWrite match {
        
        case Read => { 
          val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
          hbt.asInstanceOf[Tap[_,_,_]]
        }
        case Write => {
          val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
          hbt.asInstanceOf[Tap[_,_,_]]
        }
      }      
      case _ => createEmptyTap(readOrWrite)(mode)
    }
  }
  
  
  
  def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
    throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
  }
}