diff options
| author | rotem <rotem.hermon@gmail.com> | 2013-06-16 12:09:42 +0300 | 
|---|---|---|
| committer | rotem <rotem.hermon@gmail.com> | 2013-06-16 12:09:42 +0300 | 
| commit | b72c234dd35c3eb807e8050385adf697dcf97fad (patch) | |
| tree | 6a2372996840544af156bac37f837448deaa8792 | |
| parent | 388714f40b45e19985ab2ac68a23e96d71d857ad (diff) | |
| download | SpyGlass-b72c234dd35c3eb807e8050385adf697dcf97fad.tar.gz SpyGlass-b72c234dd35c3eb807e8050385adf697dcf97fad.zip | |
Adding HBase raw taps and source, bumping needed dependency versions, bumping scala version
| -rw-r--r-- | pom.xml | 49 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | 286 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 311 | ||||
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 62 | 
4 files changed, 689 insertions, 19 deletions
| @@ -12,7 +12,7 @@  	<name>Cascading and Scalding wrapper for HBase with advanced features</name>  	<groupId>parallelai</groupId>  	<artifactId>parallelai.spyglass</artifactId> -	<version>2.0.3</version> +	<version>2.0.4-SNAPSHOT</version>  	<packaging>jar</packaging>  	<properties> @@ -29,25 +29,26 @@  		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  		<!-- Cloudera's Distribution Including Apache Hadoop version 4.2.0 --> -		<datafu.version>0.0.4-cdh4.2.0</datafu.version> -		<flume.version>1.3.0-cdh4.2.0</flume.version> -		<hadoop.version>2.0.0-cdh4.2.0</hadoop.version> -		<hbase.version>0.94.2-cdh4.2.0</hbase.version> -		<hive.version>0.10.0-cdh4.2.0</hive.version> -		<mahout.version>0.7-cdh4.2.0</mahout.version> -		<mapreduce.version>2.0.0-mr1-cdh4.2.0</mapreduce.version> -		<oozie.version>3.3.0-cdh4.2.0</oozie.version> -		<oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0</oozie-hadoop.version> -		<oozie-sharelib.version>3.3.0-cdh4.2.0</oozie-sharelib.version> -		<pig.version>0.10.0-cdh4.2.0</pig.version> -		<sqoop.version>1.4.2-cdh4.2.0</sqoop.version> -		<whirr.version>0.8.0-cdh4.2.0</whirr.version> -		<zookeeper.version>3.4.5-cdh4.2.0</zookeeper.version> +		<datafu.version>0.0.4-cdh4.3.0</datafu.version> +		<flume.version>1.3.0-cdh4.3.0</flume.version> +		<hadoop.version>2.0.0-cdh4.3.0</hadoop.version> +		<hbase.version>0.94.6-cdh4.3.0</hbase.version> +		<hive.version>0.10.0-cdh4.3.0</hive.version> +		<mahout.version>0.7-cdh4.3.0</mahout.version> +		<mapreduce.version>2.0.0-mr1-cdh4.3.0</mapreduce.version> +		<oozie.version>3.3.2-cdh4.3.0</oozie.version> +		<oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0</oozie-hadoop.version> +		<oozie-sharelib.version>3.3.2-cdh4.3.0</oozie-sharelib.version> +		<pig.version>0.11.0-cdh4.3.0</pig.version> +		<sqoop.version>1.4.3-cdh4.3.0</sqoop.version> +		<whirr.version>0.8.2-cdh4.3.0</whirr.version> +		<zookeeper.version>3.4.5-cdh4.3.0</zookeeper.version>  		<!-- Scala/Scalding/Cascading properties --> -		<scala.version>2.9.2</scala.version> -		<scalding.version>0.8.3</scalding.version> -		<cascading.version>2.1.0</cascading.version> +		<scala.version>2.10.2</scala.version> +		<scalding.scala.version>2.10</scalding.scala.version> +		<scalding.version>0.8.5</scalding.version> +		<cascading.version>2.1.6</cascading.version>  		<scalding-commons.version>0.1.1</scalding-commons.version>  		<scalatest.version>1.7.1</scalatest.version> @@ -162,7 +163,7 @@  		</dependency>  		<dependency>  			<groupId>com.twitter</groupId> -			<artifactId>scalding_${scala.version}</artifactId> +			<artifactId>scalding-core_${scalding.scala.version}</artifactId>  			<version>${scalding.version}</version>  			<exclusions>  				<exclusion>  <!-- Declare exclusion, in order to use custom build --> @@ -189,6 +190,16 @@  			<version>${typesafe.config.version}</version>  		</dependency> +		<dependency> +			<groupId>com.twitter.elephantbird</groupId> +			<artifactId>elephant-bird-core</artifactId> +			<version>4.1</version> +		</dependency> +		<dependency> +			<groupId>com.twitter.elephantbird</groupId> +			<artifactId>elephant-bird-hadoop-compat</artifactId> +			<version>4.1</version> +		</dependency>  	</dependencies>  	<!-- From https://wiki.scala-lang.org/display/SIW/ScalaEclipseMaven --> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java new file mode 100644 index 0000000..32730d6 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; + +/** + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + *  + * @see HBaseRawTap + */ +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +	/** +	 *  +	 */ +	private static final long serialVersionUID = 6248976486883281356L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); + +	public final Fields RowKeyField = new Fields("rowkey"); +	public final Fields RowField = new Fields("row"); + +	/** String familyNames */ +	private String[] familyNames; + +	private boolean writeNulls = true; + +	/** +	 * Constructor HBaseScheme creates a new HBaseScheme instance. +	 *  +	 * @param keyFields +	 *            of type Fields +	 * @param familyName +	 *            of type String +	 * @param valueFields +	 *            of type Fields +	 */ +	public HBaseRawScheme(String familyName) { +		this(new String[] { familyName }); +	} + +	public HBaseRawScheme(String[] familyNames) { +		this.familyNames = familyNames; +		setSourceFields(); +	} + +	public HBaseRawScheme(String familyName, boolean writeNulls) { +		this(new String[] { familyName }, writeNulls); +	} + +	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +		this.familyNames = familyNames; +		this.writeNulls = writeNulls; +		setSourceFields(); +	} + +	private void setSourceFields() { +		Fields sourceFields = Fields.join(RowKeyField, RowField); +		setSourceFields(sourceFields); +	} + +	/** +	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme +	 * object. +	 *  +	 * @return the familyNames (type String[]) of this HBaseScheme object. +	 */ +	public String[] getFamilyNames() { +		HashSet<String> familyNameSet = new HashSet<String>(); +		if (familyNames != null) { +			for (String familyName : familyNames) { +				familyNameSet.add(familyName); +			} +		} +		return familyNameSet.toArray(new String[0]); +	} + +	@Override +	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; + +		sourceCall.setContext(pair); +	} + +	@Override +	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		sourceCall.setContext(null); +	} + +	@SuppressWarnings("unchecked") +	@Override +	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) +			throws IOException { +		Tuple result = new Tuple(); + +		Object key = sourceCall.getContext()[0]; +		Object value = sourceCall.getContext()[1]; +		boolean hasNext = sourceCall.getInput().next(key, value); +		if (!hasNext) { +			return false; +		} + +		// Skip nulls +		if (key == null || value == null) { +			return true; +		} + +		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +		Result row = (Result) value; +		result.add(keyWritable); +		result.add(row); +		sourceCall.getIncomingEntry().setTuple(result); +		return true; +	} + +	@SuppressWarnings("unchecked") +	@Override +	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { +		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +		OutputCollector outputCollector = sinkCall.getOutput(); +		Tuple key = tupleEntry.selectTuple(RowKeyField); +		Object okey = key.getObject(0); +		ImmutableBytesWritable keyBytes = getBytes(okey); +		Put put = new Put(keyBytes.get()); +		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +		if (null != outFields) { +			TupleEntry values = tupleEntry.selectEntry(outFields); +			for (int n = 0; n < values.getFields().size(); n++) { +				Object o = values.get(n); +				ImmutableBytesWritable valueBytes = getBytes(o); +				Comparable field = outFields.get(n); +				ColumnName cn = parseColumn((String) field); +				if (null == cn.family) { +					if (n >= familyNames.length) +						cn.family = familyNames[familyNames.length - 1]; +					else +						cn.family = familyNames[n]; +				} +				if (null != o || writeNulls) +					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +			} +		} +		outputCollector.collect(null, put); +	} + +	private ImmutableBytesWritable getBytes(Object obj) { +		if (null == obj) +			return new ImmutableBytesWritable(new byte[0]); +		if (obj instanceof ImmutableBytesWritable) +			return (ImmutableBytesWritable) obj; +		else if (obj instanceof String) +			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +		else if (obj instanceof Long) +			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +		else if (obj instanceof Integer) +			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +		else if (obj instanceof Short) +			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +		else if (obj instanceof Boolean) +			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +		else if (obj instanceof Double) +			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +		else +			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +					+ obj.getClass().getName()); +	} + +	private ColumnName parseColumn(String column) { +		ColumnName ret = new ColumnName(); +		int pos = column.indexOf(":"); +		if (pos > 0) { +			ret.name = column.substring(pos + 1); +			ret.family = column.substring(0, pos); +		} else { +			ret.name = column; +		} +		return ret; +	} + +	private class ColumnName { +		String family; +		String name; + +		ColumnName() { +		} +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +		conf.setOutputFormat(TableOutputFormat.class); +		conf.setOutputKeyClass(ImmutableBytesWritable.class); +		conf.setOutputValueClass(Put.class); +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +			JobConf conf) { +		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +				ValueCopier.class); +		if (null != familyNames) { +			String columns = Util.join(this.familyNames, " "); +			LOG.debug("sourcing from column families: {}", columns); +			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +		} +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawScheme that = (HBaseRawScheme) object; + +		if (!Arrays.equals(familyNames, that.familyNames)) { +			return false; +		} +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +		return result; +	} + +	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { + +		public ValueCopier() { +		} + +		public void copyValue(Result oldValue, Result newValue) { +			if (null != oldValue && null != newValue) { +				oldValue.copyFrom(newValue); +			} +		} + +	} +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java new file mode 100644 index 0000000..738fd51 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -0,0 +1,311 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** + * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with + * the {@HBaseRawScheme} to allow for the reading and writing + * of data to and from a HBase cluster. + */ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { +	/** +	 *  +	 */ +	private static final long serialVersionUID = 8019189493428493323L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + +	private final String id = UUID.randomUUID().toString(); + +	/** Field SCHEME */ +	public static final String SCHEME = "hbase"; + +	/** Field hBaseAdmin */ +	private transient HBaseAdmin hBaseAdmin; + +	/** Field hostName */ +	private String quorumNames; +	/** Field tableName */ +	private String tableName; +	private String base64Scan; + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param quorumNames +	 * @param tableName +	 * @param HBaseFullScheme +	 * @param base64Scan +	 * @param sinkMode +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +		this.base64Scan = base64Scan; +	} + +	/** +	 * Method getTableName returns the tableName of this HBaseTap object. +	 *  +	 * @return the tableName (type String) of this HBaseTap object. +	 */ +	public String getTableName() { +		return tableName; +	} + +	public Path getPath() { +		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +	} + +	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +		if (hBaseAdmin == null) { +			Configuration hbaseConf = HBaseConfiguration.create(conf); +			hBaseAdmin = new HBaseAdmin(hbaseConf); +		} + +		return hBaseAdmin; +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sinking to table: {}", tableName); + +		if (isReplace() && conf.get("mapred.task.partition") == null) { +			try { +				deleteResource(conf); + +			} catch (IOException e) { +				throw new RuntimeException("could not delete resource: " + e); +			} +		} + +		else if (isUpdate() || isReplace()) { +			try { +				createResource(conf); +			} catch (IOException e) { +				throw new RuntimeException(tableName + " does not exist !", e); +			} + +		} + +		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +		super.sinkConfInit(process, conf); +	} + +	@Override +	public String getIdentifier() { +		return id; +	} + +	@Override +	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) +			throws IOException { +		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +	} + +	@Override +	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) +			throws IOException { +		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +		hBaseCollector.prepare(); +		return hBaseCollector; +	} + +	@Override +	public boolean createResource(JobConf jobConf) throws IOException { +		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +		if (hBaseAdmin.tableExists(tableName)) { +			return true; +		} + +		LOG.info("creating hbase table: {}", tableName); + +		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + +		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + +		for (String familyName : familyNames) { +			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +		} + +		hBaseAdmin.createTable(tableDescriptor); + +		return true; +	} + +	@Override +	public boolean deleteResource(JobConf jobConf) throws IOException { +		if (getHBaseAdmin(jobConf).tableExists(tableName)) { +			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +				getHBaseAdmin(jobConf).disableTable(tableName); +			getHBaseAdmin(jobConf).deleteTable(tableName); +		} +		return true; +	} + +	@Override +	public boolean resourceExists(JobConf jobConf) throws IOException { +		return getHBaseAdmin(jobConf).tableExists(tableName); +	} + +	@Override +	public long getModifiedTime(JobConf jobConf) throws IOException { +		return System.currentTimeMillis(); // currently unable to find last mod +											// time +											// on a table +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +		// a hack for MultiInputFormat to see that there is a child format +		FileInputFormat.setInputPaths(conf, getPath()); + +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sourcing from table: {}", tableName); +		conf.set(TableInputFormat.INPUT_TABLE, tableName); +		if (null != base64Scan) +			conf.set(TableInputFormat.SCAN, base64Scan); +			 +		super.sourceConfInit(process, conf); +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawTap hBaseTap = (HBaseRawTap) object; + +		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +			return false; +		} + +		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +			return false; +		} + +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +		return result; +	} +} diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala new file mode 100644 index 0000000..1fc6f7d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -0,0 +1,62 @@ +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { +  def convertScanToString(scan: Scan) = { +    val out = new ByteArrayOutputStream(); +    val dos = new DataOutputStream(out); +    scan.write(dos); +    Base64.encodeBytes(out.toByteArray()); +  } +} +class HBaseRawSource( +  tableName: String, +  quorumNames: String = "localhost", +  familyNames: Array[String], +  writeNulls: Boolean = true, +  base64Scan:String = null, +  sinkMode: SinkMode = null) extends Source { + +  override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +    val hBaseScheme = hdfsScheme match { +      case hbase: HBaseRawScheme => hbase +      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +    } +    mode match { +      case hdfsMode @ Hdfs(_, _) => readOrWrite match { +        case Read => { +          new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +            case null => SinkMode.KEEP +            case _ => sinkMode +          }).asInstanceOf[Tap[_,_,_]] +        } +        case Write => { +          new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +            case null => SinkMode.UPDATE +            case _ => sinkMode +          }).asInstanceOf[Tap[_,_,_]] +        } +      } +      case _ => super.createTap(readOrWrite)(mode) +    } +  } +} | 
