From cbf6c2903bfd0a5fe528c54382ea791c45637ded Mon Sep 17 00:00:00 2001 From: cra14 Date: Fri, 26 Apr 2013 12:47:12 +0100 Subject: First public release of Spy Glass code base --- .../parallelai/spyglass/hbase/HBaseSource.scala | 82 ++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala (limited to 'src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala') diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala new file mode 100644 index 0000000..e46ef50 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -0,0 +1,82 @@ +package parallelai.spyglass.hbase + +import java.io.IOException +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.AccessMode +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Mode +import com.twitter.scalding.Read +import com.twitter.scalding.Source +import com.twitter.scalding.Write +import cascading.scheme.Scheme +import cascading.tap.SinkMode +import cascading.tap.Tap +import cascading.tuple.Fields +import org.apache.hadoop.mapred.RecordReader +import scala.compat.Platform +import org.apache.hadoop.mapred.OutputCollector +import org.apache.hadoop.mapred.JobConf +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +object Conversions { + implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) + implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong + implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get()) + implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s)) +} + +class HBaseSource( + tableName: String = null, + quorumNames: String = "localhost", + keyFields: Fields = null, + familyNames: Array[String] = null, + valueFields: Array[Fields] = null, + timestamp: Long = Platform.currentTime, + sourceMode: SourceMode = SourceMode.SCAN_ALL, + startKey: String = null, + stopKey: String = null, + keyList: List[String] = null + ) extends Source { + + override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) + + sourceMode match { + case SourceMode.SCAN_RANGE => { + hbt.setHBaseRangeParms(startKey, stopKey) + } + case SourceMode.SCAN_ALL => { + hbt.setHBaseScanAllParms() + } + case SourceMode.GET_LIST => { + if( keyList == null ) + throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) + + hbt.setHBaseListParms(keyList.toArray[String]) + } + case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) + } + + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) + + hbt.asInstanceOf[Tap[_,_,_]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} -- cgit v1.2.3