diff options
Diffstat (limited to 'src/main/scala')
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 166 | 
1 files changed, 83 insertions, 83 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 6216695..450a57d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -//package parallelai.spyglass.hbase -// -//import cascading.pipe.Pipe -//import cascading.pipe.assembly.Coerce -//import cascading.scheme.Scheme -//import cascading.tap.{ Tap, SinkMode } -//import cascading.tuple.Fields -//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -//import org.apache.hadoop.hbase.util.Bytes -//import scala.collection.JavaConversions._ -//import scala.collection.mutable.WrappedArray -//import com.twitter.scalding._ -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable -//import org.apache.hadoop.hbase.client.Scan -//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -//import org.apache.hadoop.hbase.util.Base64 -//import java.io.ByteArrayOutputStream -//import java.io.DataOutputStream -// -//object HBaseRawSource { -//	/** -//	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource -//	 * @param scan -//	 * @return base64 string representation -//	 */ -//	def convertScanToString(scan: Scan) = { -//		val out = new ByteArrayOutputStream(); -//		val dos = new DataOutputStream(out); -//		scan.write(dos); -//		Base64.encodeBytes(out.toByteArray()); -//	} -//} -// -// -///** -// * @author Rotem Hermon -// * -// * HBaseRawSource is a scalding source that passes the original row (Result) object to the -// * mapper for customized processing. -// * -// * @param	tableName	The name of the HBase table to read -// * @param	quorumNames	HBase quorum -// * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) -// * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written -// * @param	base64Scan	An optional base64 encoded scan object -// * @param	sinkMode	If REPLACE the output table will be deleted before writing to -// * -// */ -//class HBaseRawSource( -//	tableName: String, -//	quorumNames: String = "localhost", -//	familyNames: Array[String], -//	writeNulls: Boolean = true, -//	base64Scan: String = null, -//	sinkMode: SinkMode = null) extends Source { -// -//	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -//		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -// -//	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -//		val hBaseScheme = hdfsScheme match { -//			case hbase: HBaseRawScheme => hbase -//			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -//		} -//		mode match { -//			case hdfsMode @ Hdfs(_, _) => readOrWrite match { -//				case Read => { -//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -//						case null => SinkMode.KEEP -//						case _ => sinkMode -//					}).asInstanceOf[Tap[_, _, _]] -//				} -//				case Write => { -//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -//						case null => SinkMode.UPDATE -//						case _ => sinkMode -//					}).asInstanceOf[Tap[_, _, _]] -//				} -//			} -//			case _ => super.createTap(readOrWrite)(mode) -//		} -//	} -//} +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { +	/** +	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource +	 * @param scan +	 * @return base64 string representation +	 */ +	def convertScanToString(scan: Scan) = { +		val out = new ByteArrayOutputStream(); +		val dos = new DataOutputStream(out); +		scan.write(dos); +		Base64.encodeBytes(out.toByteArray()); +	} +} + + +/** +* @author Rotem Hermon +* +* HBaseRawSource is a scalding source that passes the original row (Result) object to the +* mapper for customized processing. +* +* @param	tableName	The name of the HBase table to read +* @param	quorumNames	HBase quorum +* @param	familyNames	Column families to get (source, if null will get all) or update to (sink) +* @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written +* @param	base64Scan	An optional base64 encoded scan object +* @param	sinkMode	If REPLACE the output table will be deleted before writing to +* +*/ +class HBaseRawSource( +	tableName: String, +	quorumNames: String = "localhost", +	familyNames: Array[String], +	writeNulls: Boolean = true, +	base64Scan: String = null, +	sinkMode: SinkMode = null) extends Source { + +	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +		val hBaseScheme = hdfsScheme match { +			case hbase: HBaseRawScheme => hbase +			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +		} +		mode match { +			case hdfsMode @ Hdfs(_, _) => readOrWrite match { +				case Read => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.KEEP +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +				case Write => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.UPDATE +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +			} +			case _ => super.createTap(readOrWrite)(mode) +		} +	} +} | 
