diff options
| author | Chandan Rajah <crajah@parallelai.com> | 2013-06-17 08:58:04 -0700 | 
|---|---|---|
| committer | Chandan Rajah <crajah@parallelai.com> | 2013-06-17 08:58:04 -0700 | 
| commit | 62e276a5b80613ed56a82aedaf5dd923250b8d48 (patch) | |
| tree | 1656978997c837f662ddf1e7bc1da7b5a7a0da10 /src | |
| parent | b72c234dd35c3eb807e8050385adf697dcf97fad (diff) | |
| parent | 7566bbcc71e157444185432f2b359b6e3b857dfe (diff) | |
| download | SpyGlass-62e276a5b80613ed56a82aedaf5dd923250b8d48.tar.gz SpyGlass-62e276a5b80613ed56a82aedaf5dd923250b8d48.zip | |
Merge pull request #2 from rore/master
Adding readme about HBaseRawSource
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 8 | ||||
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 95 | 
2 files changed, 62 insertions, 41 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 738fd51..0421b6e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -131,11 +131,11 @@ public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {  	/**  	 * Constructor HBaseTap creates a new HBaseTap instance.  	 *  -	 * @param quorumNames -	 * @param tableName +	 * @param quorumNames		HBase quorum +	 * @param tableName			The name of the HBase table to read  	 * @param HBaseFullScheme -	 * @param base64Scan -	 * @param sinkMode +	 * @param base64Scan		An optional base64 encoded scan object +	 * @param sinkMode			If REPLACE the output table will be deleted before writing to  	 */  	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) {  		super(HBaseFullScheme, sinkMode); diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 1fc6f7d..bbc205b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -18,45 +18,66 @@ import java.io.ByteArrayOutputStream  import java.io.DataOutputStream  object HBaseRawSource { -  def convertScanToString(scan: Scan) = { -    val out = new ByteArrayOutputStream(); -    val dos = new DataOutputStream(out); -    scan.write(dos); -    Base64.encodeBytes(out.toByteArray()); -  } +	/** +	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource +	 * @param scan +	 * @return base64 string representation +	 */ +	def convertScanToString(scan: Scan) = { +		val out = new ByteArrayOutputStream(); +		val dos = new DataOutputStream(out); +		scan.write(dos); +		Base64.encodeBytes(out.toByteArray()); +	}  } + + +/** + * @author Rotem Hermon + * + * HBaseRawSource is a scalding source that passes the original row (Result) object to the  + * mapper for customized processing. + *  + * @param	tableName	The name of the HBase table to read + * @param	quorumNames	HBase quorum + * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) + * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written + * @param	base64Scan	An optional base64 encoded scan object  + * @param	sinkMode	If REPLACE the output table will be deleted before writing to	 + *  + */  class HBaseRawSource( -  tableName: String, -  quorumNames: String = "localhost", -  familyNames: Array[String], -  writeNulls: Boolean = true, -  base64Scan:String = null, -  sinkMode: SinkMode = null) extends Source { +	tableName: String, +	quorumNames: String = "localhost", +	familyNames: Array[String], +	writeNulls: Boolean = true, +	base64Scan: String = null, +	sinkMode: SinkMode = null) extends Source { -  override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -    val hBaseScheme = hdfsScheme match { -      case hbase: HBaseRawScheme => hbase -      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -    } -    mode match { -      case hdfsMode @ Hdfs(_, _) => readOrWrite match { -        case Read => { -          new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -            case null => SinkMode.KEEP -            case _ => sinkMode -          }).asInstanceOf[Tap[_,_,_]] -        } -        case Write => { -          new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -            case null => SinkMode.UPDATE -            case _ => sinkMode -          }).asInstanceOf[Tap[_,_,_]] -        } -      } -      case _ => super.createTap(readOrWrite)(mode) -    } -  } +	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +		val hBaseScheme = hdfsScheme match { +			case hbase: HBaseRawScheme => hbase +			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +		} +		mode match { +			case hdfsMode @ Hdfs(_, _) => readOrWrite match { +				case Read => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.KEEP +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +				case Write => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.UPDATE +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +			} +			case _ => super.createTap(readOrWrite)(mode) +		} +	}  } | 
