diff options
| -rw-r--r-- | pom.xml | 56 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | 572 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 622 | ||||
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 166 | 
4 files changed, 716 insertions, 700 deletions
| @@ -29,34 +29,50 @@  		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  		<!-- Cloudera's Distribution Including Apache Hadoop version 4.2.0 --> -		<datafu.version>0.0.4-cdh4.3.0</datafu.version> -		<flume.version>1.3.0-cdh4.3.0</flume.version> -		<hadoop.version>2.0.0-cdh4.3.0</hadoop.version> -		<hbase.version>0.94.6-cdh4.3.0</hbase.version> -		<hive.version>0.10.0-cdh4.3.0</hive.version> -		<mahout.version>0.7-cdh4.3.0</mahout.version> -		<mapreduce.version>2.0.0-mr1-cdh4.3.0</mapreduce.version> -		<oozie.version>3.3.2-cdh4.3.0</oozie.version> -		<oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0</oozie-hadoop.version> -		<oozie-sharelib.version>3.3.2-cdh4.3.0</oozie-sharelib.version> -		<pig.version>0.11.0-cdh4.3.0</pig.version> -		<sqoop.version>1.4.3-cdh4.3.0</sqoop.version> -		<whirr.version>0.8.2-cdh4.3.0</whirr.version> -		<zookeeper.version>3.4.5-cdh4.3.0</zookeeper.version> +		<!--<datafu.version>0.0.4-cdh4.3.0</datafu.version>--> +		<!--<flume.version>1.3.0-cdh4.3.0</flume.version>--> +		<!--<hadoop.version>2.0.0-cdh4.3.0</hadoop.version>--> +		<!--<hbase.version>0.94.6-cdh4.3.0</hbase.version>--> +		<!--<hive.version>0.10.0-cdh4.3.0</hive.version>--> +		<!--<mahout.version>0.7-cdh4.3.0</mahout.version>--> +		<!--<mapreduce.version>2.0.0-mr1-cdh4.3.0</mapreduce.version>--> +		<!--<oozie.version>3.3.2-cdh4.3.0</oozie.version>--> +		<!--<oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0</oozie-hadoop.version>--> +		<!--<oozie-sharelib.version>3.3.2-cdh4.3.0</oozie-sharelib.version>--> +		<!--<pig.version>0.11.0-cdh4.3.0</pig.version>--> +		<!--<sqoop.version>1.4.3-cdh4.3.0</sqoop.version>--> +		<!--<whirr.version>0.8.2-cdh4.3.0</whirr.version>--> +		<!--<zookeeper.version>3.4.5-cdh4.3.0</zookeeper.version>--> +        <datafu.version>0.0.4-cdh4.2.0</datafu.version> +        <flume.version>1.3.0-cdh4.2.0</flume.version> +        <hadoop.version>2.0.0-cdh4.2.0</hadoop.version> +        <hbase.version>0.94.2-cdh4.2.0</hbase.version> +        <hive.version>0.10.0-cdh4.2.0</hive.version> +        <mahout.version>0.7-cdh4.2.0</mahout.version> +        <mapreduce.version>2.0.0-mr1-cdh4.2.0</mapreduce.version> +        <oozie.version>3.3.0-cdh4.2.0</oozie.version> +        <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0</oozie-hadoop.version> +        <oozie-sharelib.version>3.3.0-cdh4.2.0</oozie-sharelib.version> +        <pig.version>0.10.0-cdh4.2.0</pig.version> +        <sqoop.version>1.4.2-cdh4.2.0</sqoop.version> +        <whirr.version>0.8.0-cdh4.2.0</whirr.version> +        <zookeeper.version>3.4.5-cdh4.2.0</zookeeper.version>  		<!-- Scala/Scalding/Cascading properties --> -		<scala.version>2.10.2</scala.version> -		<scalding.scala.version>2.10</scalding.scala.version> -		<scalding.version>0.8.5</scalding.version> -		<cascading.version>2.1.6</cascading.version> -		<scalding-commons.version>0.1.1</scalding-commons.version> +		<!--<scala.version>2.10.2</scala.version>--> +        <scala.version>2.9.3</scala.version> +		<!--<scalding.scala.version>2.10</scalding.scala.version>--> +        <scalding.scala.version>2.9.3</scalding.scala.version> +        <scalding.version>0.8.6</scalding.version> +		<cascading.version>2.1.0</cascading.version> +		<scalding-commons.version>0.2.0</scalding-commons.version>  		<scalatest.version>1.9.1</scalatest.version>  		<trove4j.version>3.0.3</trove4j.version>  		<maple.version>0.2.8</maple.version>  		<specs2.version>1.12.4.1</specs2.version>  		<typesafe.config.version>1.0.0</typesafe.config.version> -        <specs2.version>2.0</specs2.version> +        <!--<specs2.version>2.0</specs2.version>-->          <!-- Other libraries properties -->  		<junit.version>4.10</junit.version> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 32730d6..7dba40d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -1,286 +1,286 @@ -/* - * Copyright (c) 2009 Concurrent, Inc. - * - * This work has been released into the public domain - * by the copyright holder. This applies worldwide. - * - * In case this is not legally possible: - * The copyright holder grants any entity the right - * to use this work for any purpose, without any - * conditions, unless such conditions are required by law. - */ - -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; - -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; -import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; - -import cascading.flow.FlowProcess; -import cascading.scheme.Scheme; -import cascading.scheme.SinkCall; -import cascading.scheme.SourceCall; -import cascading.tap.Tap; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import cascading.util.Util; - -/** - * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction - * with the {@HBaseRawTap} to allow for the reading and writing of data - * to and from a HBase cluster. - *  - * @see HBaseRawTap - */ -@SuppressWarnings({ "rawtypes", "deprecation" }) -public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { -	/** -	 *  -	 */ -	private static final long serialVersionUID = 6248976486883281356L; - -	/** Field LOG */ -	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); - -	public final Fields RowKeyField = new Fields("rowkey"); -	public final Fields RowField = new Fields("row"); - -	/** String familyNames */ -	private String[] familyNames; - -	private boolean writeNulls = true; - -	/** -	 * Constructor HBaseScheme creates a new HBaseScheme instance. -	 *  -	 * @param keyFields -	 *            of type Fields -	 * @param familyName -	 *            of type String -	 * @param valueFields -	 *            of type Fields -	 */ -	public HBaseRawScheme(String familyName) { -		this(new String[] { familyName }); -	} - -	public HBaseRawScheme(String[] familyNames) { -		this.familyNames = familyNames; -		setSourceFields(); -	} - -	public HBaseRawScheme(String familyName, boolean writeNulls) { -		this(new String[] { familyName }, writeNulls); -	} - -	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { -		this.familyNames = familyNames; -		this.writeNulls = writeNulls; -		setSourceFields(); -	} - -	private void setSourceFields() { -		Fields sourceFields = Fields.join(RowKeyField, RowField); -		setSourceFields(sourceFields); -	} - -	/** -	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme -	 * object. -	 *  -	 * @return the familyNames (type String[]) of this HBaseScheme object. -	 */ -	public String[] getFamilyNames() { -		HashSet<String> familyNameSet = new HashSet<String>(); -		if (familyNames != null) { -			for (String familyName : familyNames) { -				familyNameSet.add(familyName); -			} -		} -		return familyNameSet.toArray(new String[0]); -	} - -	@Override -	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; - -		sourceCall.setContext(pair); -	} - -	@Override -	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -		sourceCall.setContext(null); -	} - -	@SuppressWarnings("unchecked") -	@Override -	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) -			throws IOException { -		Tuple result = new Tuple(); - -		Object key = sourceCall.getContext()[0]; -		Object value = sourceCall.getContext()[1]; -		boolean hasNext = sourceCall.getInput().next(key, value); -		if (!hasNext) { -			return false; -		} - -		// Skip nulls -		if (key == null || value == null) { -			return true; -		} - -		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; -		Result row = (Result) value; -		result.add(keyWritable); -		result.add(row); -		sourceCall.getIncomingEntry().setTuple(result); -		return true; -	} - -	@SuppressWarnings("unchecked") -	@Override -	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { -		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); -		OutputCollector outputCollector = sinkCall.getOutput(); -		Tuple key = tupleEntry.selectTuple(RowKeyField); -		Object okey = key.getObject(0); -		ImmutableBytesWritable keyBytes = getBytes(okey); -		Put put = new Put(keyBytes.get()); -		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); -		if (null != outFields) { -			TupleEntry values = tupleEntry.selectEntry(outFields); -			for (int n = 0; n < values.getFields().size(); n++) { -				Object o = values.get(n); -				ImmutableBytesWritable valueBytes = getBytes(o); -				Comparable field = outFields.get(n); -				ColumnName cn = parseColumn((String) field); -				if (null == cn.family) { -					if (n >= familyNames.length) -						cn.family = familyNames[familyNames.length - 1]; -					else -						cn.family = familyNames[n]; -				} -				if (null != o || writeNulls) -					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); -			} -		} -		outputCollector.collect(null, put); -	} - -	private ImmutableBytesWritable getBytes(Object obj) { -		if (null == obj) -			return new ImmutableBytesWritable(new byte[0]); -		if (obj instanceof ImmutableBytesWritable) -			return (ImmutableBytesWritable) obj; -		else if (obj instanceof String) -			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); -		else if (obj instanceof Long) -			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); -		else if (obj instanceof Integer) -			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); -		else if (obj instanceof Short) -			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); -		else if (obj instanceof Boolean) -			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); -		else if (obj instanceof Double) -			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); -		else -			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" -					+ obj.getClass().getName()); -	} - -	private ColumnName parseColumn(String column) { -		ColumnName ret = new ColumnName(); -		int pos = column.indexOf(":"); -		if (pos > 0) { -			ret.name = column.substring(pos + 1); -			ret.family = column.substring(0, pos); -		} else { -			ret.name = column; -		} -		return ret; -	} - -	private class ColumnName { -		String family; -		String name; - -		ColumnName() { -		} -	} - -	@Override -	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -		conf.setOutputFormat(TableOutputFormat.class); -		conf.setOutputKeyClass(ImmutableBytesWritable.class); -		conf.setOutputValueClass(Put.class); -	} - -	@Override -	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, -			JobConf conf) { -		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, -				ValueCopier.class); -		if (null != familyNames) { -			String columns = Util.join(this.familyNames, " "); -			LOG.debug("sourcing from column families: {}", columns); -			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); -		} -	} - -	@Override -	public boolean equals(Object object) { -		if (this == object) { -			return true; -		} -		if (object == null || getClass() != object.getClass()) { -			return false; -		} -		if (!super.equals(object)) { -			return false; -		} - -		HBaseRawScheme that = (HBaseRawScheme) object; - -		if (!Arrays.equals(familyNames, that.familyNames)) { -			return false; -		} -		return true; -	} - -	@Override -	public int hashCode() { -		int result = super.hashCode(); -		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); -		return result; -	} - -	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { - -		public ValueCopier() { -		} - -		public void copyValue(Result oldValue, Result newValue) { -			if (null != oldValue && null != newValue) { -				oldValue.copyFrom(newValue); -			} -		} - -	} -} +///* +//* Copyright (c) 2009 Concurrent, Inc. +//* +//* This work has been released into the public domain +//* by the copyright holder. This applies worldwide. +//* +//* In case this is not legally possible: +//* The copyright holder grants any entity the right +//* to use this work for any purpose, without any +//* conditions, unless such conditions are required by law. +//*/ +// +//package parallelai.spyglass.hbase; +// +//import java.io.IOException; +//import java.util.Arrays; +//import java.util.HashSet; +// +//import org.apache.hadoop.hbase.client.Put; +//import org.apache.hadoop.hbase.client.Result; +//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +//import org.apache.hadoop.hbase.mapred.TableOutputFormat; +//import org.apache.hadoop.hbase.util.Bytes; +//import org.apache.hadoop.mapred.JobConf; +//import org.apache.hadoop.mapred.OutputCollector; +//import org.apache.hadoop.mapred.RecordReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; +// +//import cascading.flow.FlowProcess; +//import cascading.scheme.Scheme; +//import cascading.scheme.SinkCall; +//import cascading.scheme.SourceCall; +//import cascading.tap.Tap; +//import cascading.tuple.Fields; +//import cascading.tuple.Tuple; +//import cascading.tuple.TupleEntry; +//import cascading.util.Util; +// +///** +//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction +//* with the {@HBaseRawTap} to allow for the reading and writing of data +//* to and from a HBase cluster. +//* +//* @see HBaseRawTap +//*/ +//@SuppressWarnings({ "rawtypes", "deprecation" }) +//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +//	/** +//	 * +//	 */ +//	private static final long serialVersionUID = 6248976486883281356L; +// +//	/** Field LOG */ +//	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); +// +//	public final Fields RowKeyField = new Fields("rowkey"); +//	public final Fields RowField = new Fields("row"); +// +//	/** String familyNames */ +//	private String[] familyNames; +// +//	private boolean writeNulls = true; +// +//	/** +//	 * Constructor HBaseScheme creates a new HBaseScheme instance. +//	 * +//	 * @param keyFields +//	 *            of type Fields +//	 * @param familyName +//	 *            of type String +//	 * @param valueFields +//	 *            of type Fields +//	 */ +//	public HBaseRawScheme(String familyName) { +//		this(new String[] { familyName }); +//	} +// +//	public HBaseRawScheme(String[] familyNames) { +//		this.familyNames = familyNames; +//		setSourceFields(); +//	} +// +//	public HBaseRawScheme(String familyName, boolean writeNulls) { +//		this(new String[] { familyName }, writeNulls); +//	} +// +//	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +//		this.familyNames = familyNames; +//		this.writeNulls = writeNulls; +//		setSourceFields(); +//	} +// +//	private void setSourceFields() { +//		Fields sourceFields = Fields.join(RowKeyField, RowField); +//		setSourceFields(sourceFields); +//	} +// +//	/** +//	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme +//	 * object. +//	 * +//	 * @return the familyNames (type String[]) of this HBaseScheme object. +//	 */ +//	public String[] getFamilyNames() { +//		HashSet<String> familyNameSet = new HashSet<String>(); +//		if (familyNames != null) { +//			for (String familyName : familyNames) { +//				familyNameSet.add(familyName); +//			} +//		} +//		return familyNameSet.toArray(new String[0]); +//	} +// +//	@Override +//	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +//		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; +// +//		sourceCall.setContext(pair); +//	} +// +//	@Override +//	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +//		sourceCall.setContext(null); +//	} +// +//	@SuppressWarnings("unchecked") +//	@Override +//	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) +//			throws IOException { +//		Tuple result = new Tuple(); +// +//		Object key = sourceCall.getContext()[0]; +//		Object value = sourceCall.getContext()[1]; +//		boolean hasNext = sourceCall.getInput().next(key, value); +//		if (!hasNext) { +//			return false; +//		} +// +//		// Skip nulls +//		if (key == null || value == null) { +//			return true; +//		} +// +//		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +//		Result row = (Result) value; +//		result.add(keyWritable); +//		result.add(row); +//		sourceCall.getIncomingEntry().setTuple(result); +//		return true; +//	} +// +//	@SuppressWarnings("unchecked") +//	@Override +//	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { +//		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +//		OutputCollector outputCollector = sinkCall.getOutput(); +//		Tuple key = tupleEntry.selectTuple(RowKeyField); +//		Object okey = key.getObject(0); +//		ImmutableBytesWritable keyBytes = getBytes(okey); +//		Put put = new Put(keyBytes.get()); +//		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +//		if (null != outFields) { +//			TupleEntry values = tupleEntry.selectEntry(outFields); +//			for (int n = 0; n < values.getFields().size(); n++) { +//				Object o = values.get(n); +//				ImmutableBytesWritable valueBytes = getBytes(o); +//				Comparable field = outFields.get(n); +//				ColumnName cn = parseColumn((String) field); +//				if (null == cn.family) { +//					if (n >= familyNames.length) +//						cn.family = familyNames[familyNames.length - 1]; +//					else +//						cn.family = familyNames[n]; +//				} +//				if (null != o || writeNulls) +//					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +//			} +//		} +//		outputCollector.collect(null, put); +//	} +// +//	private ImmutableBytesWritable getBytes(Object obj) { +//		if (null == obj) +//			return new ImmutableBytesWritable(new byte[0]); +//		if (obj instanceof ImmutableBytesWritable) +//			return (ImmutableBytesWritable) obj; +//		else if (obj instanceof String) +//			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +//		else if (obj instanceof Long) +//			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +//		else if (obj instanceof Integer) +//			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +//		else if (obj instanceof Short) +//			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +//		else if (obj instanceof Boolean) +//			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +//		else if (obj instanceof Double) +//			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +//		else +//			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +//					+ obj.getClass().getName()); +//	} +// +//	private ColumnName parseColumn(String column) { +//		ColumnName ret = new ColumnName(); +//		int pos = column.indexOf(":"); +//		if (pos > 0) { +//			ret.name = column.substring(pos + 1); +//			ret.family = column.substring(0, pos); +//		} else { +//			ret.name = column; +//		} +//		return ret; +//	} +// +//	private class ColumnName { +//		String family; +//		String name; +// +//		ColumnName() { +//		} +//	} +// +//	@Override +//	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +//		conf.setOutputFormat(TableOutputFormat.class); +//		conf.setOutputKeyClass(ImmutableBytesWritable.class); +//		conf.setOutputValueClass(Put.class); +//	} +// +//	@Override +//	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +//			JobConf conf) { +//		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +//				ValueCopier.class); +//		if (null != familyNames) { +//			String columns = Util.join(this.familyNames, " "); +//			LOG.debug("sourcing from column families: {}", columns); +//			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +//		} +//	} +// +//	@Override +//	public boolean equals(Object object) { +//		if (this == object) { +//			return true; +//		} +//		if (object == null || getClass() != object.getClass()) { +//			return false; +//		} +//		if (!super.equals(object)) { +//			return false; +//		} +// +//		HBaseRawScheme that = (HBaseRawScheme) object; +// +//		if (!Arrays.equals(familyNames, that.familyNames)) { +//			return false; +//		} +//		return true; +//	} +// +//	@Override +//	public int hashCode() { +//		int result = super.hashCode(); +//		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +//		return result; +//	} +// +//	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { +// +//		public ValueCopier() { +//		} +// +//		public void copyValue(Result oldValue, Result newValue) { +//			if (null != oldValue && null != newValue) { +//				oldValue.copyFrom(newValue); +//			} +//		} +// +//	} +//} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 0421b6e..780d3fc 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -/* - * Copyright (c) 2009 Concurrent, Inc. - * - * This work has been released into the public domain - * by the copyright holder. This applies worldwide. - * - * In case this is not legally possible: - * The copyright holder grants any entity the right - * to use this work for any purpose, without any - * conditions, unless such conditions are required by law. - */ - -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.util.UUID; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import cascading.flow.FlowProcess; -import cascading.tap.SinkMode; -import cascading.tap.Tap; -import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -import cascading.tuple.TupleEntryCollector; -import cascading.tuple.TupleEntryIterator; - -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; - -/** - * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with - * the {@HBaseRawScheme} to allow for the reading and writing - * of data to and from a HBase cluster. - */ -@SuppressWarnings({ "deprecation", "rawtypes" }) -public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { -	/** -	 *  -	 */ -	private static final long serialVersionUID = 8019189493428493323L; - -	/** Field LOG */ -	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); - -	private final String id = UUID.randomUUID().toString(); - -	/** Field SCHEME */ -	public static final String SCHEME = "hbase"; - -	/** Field hBaseAdmin */ -	private transient HBaseAdmin hBaseAdmin; - -	/** Field hostName */ -	private String quorumNames; -	/** Field tableName */ -	private String tableName; -	private String base64Scan; - -	/** -	 * Constructor HBaseTap creates a new HBaseTap instance. -	 *  -	 * @param tableName -	 *            of type String -	 * @param HBaseFullScheme -	 *            of type HBaseFullScheme -	 */ -	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { -		super(HBaseFullScheme, SinkMode.UPDATE); -		this.tableName = tableName; -	} - -	/** -	 * Constructor HBaseTap creates a new HBaseTap instance. -	 *  -	 * @param tableName -	 *            of type String -	 * @param HBaseFullScheme -	 *            of type HBaseFullScheme -	 * @param sinkMode -	 *            of type SinkMode -	 */ -	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -		super(HBaseFullScheme, sinkMode); -		this.tableName = tableName; -	} - -	/** -	 * Constructor HBaseTap creates a new HBaseTap instance. -	 *  -	 * @param tableName -	 *            of type String -	 * @param HBaseFullScheme -	 *            of type HBaseFullScheme -	 */ -	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { -		super(HBaseFullScheme, SinkMode.UPDATE); -		this.quorumNames = quorumNames; -		this.tableName = tableName; -	} - -	/** -	 * Constructor HBaseTap creates a new HBaseTap instance. -	 *  -	 * @param tableName -	 *            of type String -	 * @param HBaseFullScheme -	 *            of type HBaseFullScheme -	 * @param sinkMode -	 *            of type SinkMode -	 */ -	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -		super(HBaseFullScheme, sinkMode); -		this.quorumNames = quorumNames; -		this.tableName = tableName; -	} - -	/** -	 * Constructor HBaseTap creates a new HBaseTap instance. -	 *  -	 * @param quorumNames		HBase quorum -	 * @param tableName			The name of the HBase table to read -	 * @param HBaseFullScheme -	 * @param base64Scan		An optional base64 encoded scan object -	 * @param sinkMode			If REPLACE the output table will be deleted before writing to -	 */ -	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { -		super(HBaseFullScheme, sinkMode); -		this.quorumNames = quorumNames; -		this.tableName = tableName; -		this.base64Scan = base64Scan; -	} - -	/** -	 * Method getTableName returns the tableName of this HBaseTap object. -	 *  -	 * @return the tableName (type String) of this HBaseTap object. -	 */ -	public String getTableName() { -		return tableName; -	} - -	public Path getPath() { -		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); -	} - -	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { -		if (hBaseAdmin == null) { -			Configuration hbaseConf = HBaseConfiguration.create(conf); -			hBaseAdmin = new HBaseAdmin(hbaseConf); -		} - -		return hBaseAdmin; -	} - -	@Override -	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { -		if (quorumNames != null) { -			conf.set("hbase.zookeeper.quorum", quorumNames); -		} - -		LOG.debug("sinking to table: {}", tableName); - -		if (isReplace() && conf.get("mapred.task.partition") == null) { -			try { -				deleteResource(conf); - -			} catch (IOException e) { -				throw new RuntimeException("could not delete resource: " + e); -			} -		} - -		else if (isUpdate() || isReplace()) { -			try { -				createResource(conf); -			} catch (IOException e) { -				throw new RuntimeException(tableName + " does not exist !", e); -			} - -		} - -		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); -		super.sinkConfInit(process, conf); -	} - -	@Override -	public String getIdentifier() { -		return id; -	} - -	@Override -	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) -			throws IOException { -		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); -	} - -	@Override -	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) -			throws IOException { -		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); -		hBaseCollector.prepare(); -		return hBaseCollector; -	} - -	@Override -	public boolean createResource(JobConf jobConf) throws IOException { -		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); - -		if (hBaseAdmin.tableExists(tableName)) { -			return true; -		} - -		LOG.info("creating hbase table: {}", tableName); - -		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - -		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); - -		for (String familyName : familyNames) { -			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); -		} - -		hBaseAdmin.createTable(tableDescriptor); - -		return true; -	} - -	@Override -	public boolean deleteResource(JobConf jobConf) throws IOException { -		if (getHBaseAdmin(jobConf).tableExists(tableName)) { -			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) -				getHBaseAdmin(jobConf).disableTable(tableName); -			getHBaseAdmin(jobConf).deleteTable(tableName); -		} -		return true; -	} - -	@Override -	public boolean resourceExists(JobConf jobConf) throws IOException { -		return getHBaseAdmin(jobConf).tableExists(tableName); -	} - -	@Override -	public long getModifiedTime(JobConf jobConf) throws IOException { -		return System.currentTimeMillis(); // currently unable to find last mod -											// time -											// on a table -	} - -	@Override -	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { -		// a hack for MultiInputFormat to see that there is a child format -		FileInputFormat.setInputPaths(conf, getPath()); - -		if (quorumNames != null) { -			conf.set("hbase.zookeeper.quorum", quorumNames); -		} - -		LOG.debug("sourcing from table: {}", tableName); -		conf.set(TableInputFormat.INPUT_TABLE, tableName); -		if (null != base64Scan) -			conf.set(TableInputFormat.SCAN, base64Scan); -			 -		super.sourceConfInit(process, conf); -	} - -	@Override -	public boolean equals(Object object) { -		if (this == object) { -			return true; -		} -		if (object == null || getClass() != object.getClass()) { -			return false; -		} -		if (!super.equals(object)) { -			return false; -		} - -		HBaseRawTap hBaseTap = (HBaseRawTap) object; - -		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { -			return false; -		} - -		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { -			return false; -		} - -		return true; -	} - -	@Override -	public int hashCode() { -		int result = super.hashCode(); -		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); -		return result; -	} -} +///* +//* Copyright (c) 2009 Concurrent, Inc. +//* +//* This work has been released into the public domain +//* by the copyright holder. This applies worldwide. +//* +//* In case this is not legally possible: +//* The copyright holder grants any entity the right +//* to use this work for any purpose, without any +//* conditions, unless such conditions are required by law. +//*/ +// +//package parallelai.spyglass.hbase; +// +//import java.io.IOException; +//import java.util.UUID; +// +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.fs.Path; +//import org.apache.hadoop.hbase.HBaseConfiguration; +//import org.apache.hadoop.hbase.HColumnDescriptor; +//import org.apache.hadoop.hbase.HTableDescriptor; +//import org.apache.hadoop.hbase.MasterNotRunningException; +//import org.apache.hadoop.hbase.ZooKeeperConnectionException; +//import org.apache.hadoop.hbase.client.HBaseAdmin; +//import org.apache.hadoop.hbase.client.Scan; +//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +//import org.apache.hadoop.mapred.FileInputFormat; +//import org.apache.hadoop.mapred.JobConf; +//import org.apache.hadoop.mapred.OutputCollector; +//import org.apache.hadoop.mapred.RecordReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import cascading.flow.FlowProcess; +//import cascading.tap.SinkMode; +//import cascading.tap.Tap; +//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +//import cascading.tuple.TupleEntryCollector; +//import cascading.tuple.TupleEntryIterator; +// +//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +// +///** +//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +//* the {@HBaseRawScheme} to allow for the reading and writing +//* of data to and from a HBase cluster. +//*/ +//@SuppressWarnings({ "deprecation", "rawtypes" }) +//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { +//	/** +//	 * +//	 */ +//	private static final long serialVersionUID = 8019189493428493323L; +// +//	/** Field LOG */ +//	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); +// +//	private final String id = UUID.randomUUID().toString(); +// +//	/** Field SCHEME */ +//	public static final String SCHEME = "hbase"; +// +//	/** Field hBaseAdmin */ +//	private transient HBaseAdmin hBaseAdmin; +// +//	/** Field hostName */ +//	private String quorumNames; +//	/** Field tableName */ +//	private String tableName; +//	private String base64Scan; +// +//	/** +//	 * Constructor HBaseTap creates a new HBaseTap instance. +//	 * +//	 * @param tableName +//	 *            of type String +//	 * @param HBaseFullScheme +//	 *            of type HBaseFullScheme +//	 */ +//	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +//		super(HBaseFullScheme, SinkMode.UPDATE); +//		this.tableName = tableName; +//	} +// +//	/** +//	 * Constructor HBaseTap creates a new HBaseTap instance. +//	 * +//	 * @param tableName +//	 *            of type String +//	 * @param HBaseFullScheme +//	 *            of type HBaseFullScheme +//	 * @param sinkMode +//	 *            of type SinkMode +//	 */ +//	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +//		super(HBaseFullScheme, sinkMode); +//		this.tableName = tableName; +//	} +// +//	/** +//	 * Constructor HBaseTap creates a new HBaseTap instance. +//	 * +//	 * @param tableName +//	 *            of type String +//	 * @param HBaseFullScheme +//	 *            of type HBaseFullScheme +//	 */ +//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +//		super(HBaseFullScheme, SinkMode.UPDATE); +//		this.quorumNames = quorumNames; +//		this.tableName = tableName; +//	} +// +//	/** +//	 * Constructor HBaseTap creates a new HBaseTap instance. +//	 * +//	 * @param tableName +//	 *            of type String +//	 * @param HBaseFullScheme +//	 *            of type HBaseFullScheme +//	 * @param sinkMode +//	 *            of type SinkMode +//	 */ +//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +//		super(HBaseFullScheme, sinkMode); +//		this.quorumNames = quorumNames; +//		this.tableName = tableName; +//	} +// +//	/** +//	 * Constructor HBaseTap creates a new HBaseTap instance. +//	 * +//	 * @param quorumNames		HBase quorum +//	 * @param tableName			The name of the HBase table to read +//	 * @param HBaseFullScheme +//	 * @param base64Scan		An optional base64 encoded scan object +//	 * @param sinkMode			If REPLACE the output table will be deleted before writing to +//	 */ +//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +//		super(HBaseFullScheme, sinkMode); +//		this.quorumNames = quorumNames; +//		this.tableName = tableName; +//		this.base64Scan = base64Scan; +//	} +// +//	/** +//	 * Method getTableName returns the tableName of this HBaseTap object. +//	 * +//	 * @return the tableName (type String) of this HBaseTap object. +//	 */ +//	public String getTableName() { +//		return tableName; +//	} +// +//	public Path getPath() { +//		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +//	} +// +//	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +//		if (hBaseAdmin == null) { +//			Configuration hbaseConf = HBaseConfiguration.create(conf); +//			hBaseAdmin = new HBaseAdmin(hbaseConf); +//		} +// +//		return hBaseAdmin; +//	} +// +//	@Override +//	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +//		if (quorumNames != null) { +//			conf.set("hbase.zookeeper.quorum", quorumNames); +//		} +// +//		LOG.debug("sinking to table: {}", tableName); +// +//		if (isReplace() && conf.get("mapred.task.partition") == null) { +//			try { +//				deleteResource(conf); +// +//			} catch (IOException e) { +//				throw new RuntimeException("could not delete resource: " + e); +//			} +//		} +// +//		else if (isUpdate() || isReplace()) { +//			try { +//				createResource(conf); +//			} catch (IOException e) { +//				throw new RuntimeException(tableName + " does not exist !", e); +//			} +// +//		} +// +//		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +//		super.sinkConfInit(process, conf); +//	} +// +//	@Override +//	public String getIdentifier() { +//		return id; +//	} +// +//	@Override +//	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) +//			throws IOException { +//		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +//	} +// +//	@Override +//	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) +//			throws IOException { +//		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +//		hBaseCollector.prepare(); +//		return hBaseCollector; +//	} +// +//	@Override +//	public boolean createResource(JobConf jobConf) throws IOException { +//		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); +// +//		if (hBaseAdmin.tableExists(tableName)) { +//			return true; +//		} +// +//		LOG.info("creating hbase table: {}", tableName); +// +//		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); +// +//		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); +// +//		for (String familyName : familyNames) { +//			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +//		} +// +//		hBaseAdmin.createTable(tableDescriptor); +// +//		return true; +//	} +// +//	@Override +//	public boolean deleteResource(JobConf jobConf) throws IOException { +//		if (getHBaseAdmin(jobConf).tableExists(tableName)) { +//			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +//				getHBaseAdmin(jobConf).disableTable(tableName); +//			getHBaseAdmin(jobConf).deleteTable(tableName); +//		} +//		return true; +//	} +// +//	@Override +//	public boolean resourceExists(JobConf jobConf) throws IOException { +//		return getHBaseAdmin(jobConf).tableExists(tableName); +//	} +// +//	@Override +//	public long getModifiedTime(JobConf jobConf) throws IOException { +//		return System.currentTimeMillis(); // currently unable to find last mod +//											// time +//											// on a table +//	} +// +//	@Override +//	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +//		// a hack for MultiInputFormat to see that there is a child format +//		FileInputFormat.setInputPaths(conf, getPath()); +// +//		if (quorumNames != null) { +//			conf.set("hbase.zookeeper.quorum", quorumNames); +//		} +// +//		LOG.debug("sourcing from table: {}", tableName); +//		conf.set(TableInputFormat.INPUT_TABLE, tableName); +//		if (null != base64Scan) +//			conf.set(TableInputFormat.SCAN, base64Scan); +// +//		super.sourceConfInit(process, conf); +//	} +// +//	@Override +//	public boolean equals(Object object) { +//		if (this == object) { +//			return true; +//		} +//		if (object == null || getClass() != object.getClass()) { +//			return false; +//		} +//		if (!super.equals(object)) { +//			return false; +//		} +// +//		HBaseRawTap hBaseTap = (HBaseRawTap) object; +// +//		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +//			return false; +//		} +// +//		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +//			return false; +//		} +// +//		return true; +//	} +// +//	@Override +//	public int hashCode() { +//		int result = super.hashCode(); +//		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +//		return result; +//	} +//} diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index bbc205b..6216695 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -package parallelai.spyglass.hbase - -import cascading.pipe.Pipe -import cascading.pipe.assembly.Coerce -import cascading.scheme.Scheme -import cascading.tap.{ Tap, SinkMode } -import cascading.tuple.Fields -import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -import org.apache.hadoop.hbase.util.Bytes -import scala.collection.JavaConversions._ -import scala.collection.mutable.WrappedArray -import com.twitter.scalding._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -import org.apache.hadoop.hbase.util.Base64 -import java.io.ByteArrayOutputStream -import java.io.DataOutputStream - -object HBaseRawSource { -	/** -	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource -	 * @param scan -	 * @return base64 string representation -	 */ -	def convertScanToString(scan: Scan) = { -		val out = new ByteArrayOutputStream(); -		val dos = new DataOutputStream(out); -		scan.write(dos); -		Base64.encodeBytes(out.toByteArray()); -	} -} - - -/** - * @author Rotem Hermon - * - * HBaseRawSource is a scalding source that passes the original row (Result) object to the  - * mapper for customized processing. - *  - * @param	tableName	The name of the HBase table to read - * @param	quorumNames	HBase quorum - * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) - * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written - * @param	base64Scan	An optional base64 encoded scan object  - * @param	sinkMode	If REPLACE the output table will be deleted before writing to	 - *  - */ -class HBaseRawSource( -	tableName: String, -	quorumNames: String = "localhost", -	familyNames: Array[String], -	writeNulls: Boolean = true, -	base64Scan: String = null, -	sinkMode: SinkMode = null) extends Source { - -	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] - -	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -		val hBaseScheme = hdfsScheme match { -			case hbase: HBaseRawScheme => hbase -			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -		} -		mode match { -			case hdfsMode @ Hdfs(_, _) => readOrWrite match { -				case Read => { -					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -						case null => SinkMode.KEEP -						case _ => sinkMode -					}).asInstanceOf[Tap[_, _, _]] -				} -				case Write => { -					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -						case null => SinkMode.UPDATE -						case _ => sinkMode -					}).asInstanceOf[Tap[_, _, _]] -				} -			} -			case _ => super.createTap(readOrWrite)(mode) -		} -	} -} +//package parallelai.spyglass.hbase +// +//import cascading.pipe.Pipe +//import cascading.pipe.assembly.Coerce +//import cascading.scheme.Scheme +//import cascading.tap.{ Tap, SinkMode } +//import cascading.tuple.Fields +//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +//import org.apache.hadoop.hbase.util.Bytes +//import scala.collection.JavaConversions._ +//import scala.collection.mutable.WrappedArray +//import com.twitter.scalding._ +//import org.apache.hadoop.hbase.io.ImmutableBytesWritable +//import org.apache.hadoop.hbase.client.Scan +//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +//import org.apache.hadoop.hbase.util.Base64 +//import java.io.ByteArrayOutputStream +//import java.io.DataOutputStream +// +//object HBaseRawSource { +//	/** +//	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource +//	 * @param scan +//	 * @return base64 string representation +//	 */ +//	def convertScanToString(scan: Scan) = { +//		val out = new ByteArrayOutputStream(); +//		val dos = new DataOutputStream(out); +//		scan.write(dos); +//		Base64.encodeBytes(out.toByteArray()); +//	} +//} +// +// +///** +// * @author Rotem Hermon +// * +// * HBaseRawSource is a scalding source that passes the original row (Result) object to the +// * mapper for customized processing. +// * +// * @param	tableName	The name of the HBase table to read +// * @param	quorumNames	HBase quorum +// * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) +// * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written +// * @param	base64Scan	An optional base64 encoded scan object +// * @param	sinkMode	If REPLACE the output table will be deleted before writing to +// * +// */ +//class HBaseRawSource( +//	tableName: String, +//	quorumNames: String = "localhost", +//	familyNames: Array[String], +//	writeNulls: Boolean = true, +//	base64Scan: String = null, +//	sinkMode: SinkMode = null) extends Source { +// +//	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +//		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +// +//	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +//		val hBaseScheme = hdfsScheme match { +//			case hbase: HBaseRawScheme => hbase +//			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +//		} +//		mode match { +//			case hdfsMode @ Hdfs(_, _) => readOrWrite match { +//				case Read => { +//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +//						case null => SinkMode.KEEP +//						case _ => sinkMode +//					}).asInstanceOf[Tap[_, _, _]] +//				} +//				case Write => { +//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +//						case null => SinkMode.UPDATE +//						case _ => sinkMode +//					}).asInstanceOf[Tap[_, _, _]] +//				} +//			} +//			case _ => super.createTap(readOrWrite)(mode) +//		} +//	} +//} | 
