aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala82
1 files changed, 82 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
new file mode 100644
index 0000000..e46ef50
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -0,0 +1,82 @@
+package parallelai.spyglass.hbase
+
+import java.io.IOException
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import com.twitter.scalding.AccessMode
+import com.twitter.scalding.Hdfs
+import com.twitter.scalding.Mode
+import com.twitter.scalding.Read
+import com.twitter.scalding.Source
+import com.twitter.scalding.Write
+import cascading.scheme.Scheme
+import cascading.tap.SinkMode
+import cascading.tap.Tap
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.RecordReader
+import scala.compat.Platform
+import org.apache.hadoop.mapred.OutputCollector
+import org.apache.hadoop.mapred.JobConf
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+object Conversions {
+ implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
+ implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong
+ implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get())
+ implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))
+}
+
+class HBaseSource(
+ tableName: String = null,
+ quorumNames: String = "localhost",
+ keyFields: Fields = null,
+ familyNames: Array[String] = null,
+ valueFields: Array[Fields] = null,
+ timestamp: Long = Platform.currentTime,
+ sourceMode: SourceMode = SourceMode.SCAN_ALL,
+ startKey: String = null,
+ stopKey: String = null,
+ keyList: List[String] = null
+ ) extends Source {
+
+ override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val hBaseScheme = hdfsScheme match {
+ case hbase: HBaseScheme => hbase
+ case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
+
+ sourceMode match {
+ case SourceMode.SCAN_RANGE => {
+ hbt.setHBaseRangeParms(startKey, stopKey)
+ }
+ case SourceMode.SCAN_ALL => {
+ hbt.setHBaseScanAllParms()
+ }
+ case SourceMode.GET_LIST => {
+ if( keyList == null )
+ throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
+
+ hbt.setHBaseListParms(keyList.toArray[String])
+ }
+ case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
+ }
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}