diff options
| -rw-r--r-- | pom.xml | 57 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | 572 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 622 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java | 734 | ||||
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 166 | 
5 files changed, 1097 insertions, 1054 deletions
| @@ -8,13 +8,6 @@  		<url>http://www.parallelai.com</url>  	</organization> - -	<name>Cascading and Scalding wrapper for HBase with advanced features</name> -	<groupId>parallelai</groupId> -	<artifactId>parallelai.spyglass</artifactId> -	<version>2.9.3_3.0.0</version> -	<packaging>jar</packaging> -  	<properties>  		<!-- Java compilation level -->  		<maven.compiler.source>1.6</maven.compiler.source> @@ -28,32 +21,32 @@  		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> -        <datafu.version>0.0.4-cdh4.2.0</datafu.version> -        <flume.version>1.3.0-cdh4.2.0</flume.version> -        <hadoop.version>2.0.0-cdh4.2.0</hadoop.version> -        <hbase.version>0.94.2-cdh4.2.0</hbase.version> -        <hive.version>0.10.0-cdh4.2.0</hive.version> -        <mahout.version>0.7-cdh4.2.0</mahout.version> -        <mapreduce.version>2.0.0-mr1-cdh4.2.0</mapreduce.version> -        <oozie.version>3.3.0-cdh4.2.0</oozie.version> -        <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0</oozie-hadoop.version> -        <oozie-sharelib.version>3.3.0-cdh4.2.0</oozie-sharelib.version> -        <pig.version>0.10.0-cdh4.2.0</pig.version> -        <sqoop.version>1.4.2-cdh4.2.0</sqoop.version> -        <whirr.version>0.8.0-cdh4.2.0</whirr.version> -        <zookeeper.version>3.4.5-cdh4.2.0</zookeeper.version> +        <datafu.version>0.0.4-cdh4.3.0</datafu.version> +        <flume.version>1.3.0-cdh4.3.0</flume.version> +        <hadoop.version>2.0.0-cdh4.3.0</hadoop.version> +        <hbase.version>0.94.6-cdh4.3.0</hbase.version> +        <hive.version>0.10.0-cdh4.3.0</hive.version> +        <mahout.version>0.7-cdh4.3.0</mahout.version> +        <mapreduce.version>2.0.0-mr1-cdh4.3.0</mapreduce.version> +        <oozie.version>3.3.2-cdh4.3.0</oozie.version> +        <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0</oozie-hadoop.version> +        <oozie-sharelib.version>3.3.2-cdh4.3.0</oozie-sharelib.version> +        <pig.version>0.11.0-cdh4.3.0</pig.version> +        <sqoop.version>1.4.3-cdh4.3.0</sqoop.version> +        <whirr.version>0.8.2-cdh4.3.0</whirr.version> +        <zookeeper.version>3.4.5-cdh4.3.0</zookeeper.version>  		<!-- Scala/Scalding/Cascading properties --> -        <scala.version>2.9.3</scala.version> -        <scalding.scala.version>2.9.3</scalding.scala.version> +        <scala.version>2.10.2</scala.version> +        <scalding.scala.version>2.10</scalding.scala.version>          <scalding.version>0.8.6</scalding.version> -		<cascading.version>2.1.0</cascading.version> +		<cascading.version>2.1.6</cascading.version>  		<scalding-commons.version>0.2.0</scalding-commons.version>  		<scalatest.version>1.9.1</scalatest.version>  		<trove4j.version>3.0.3</trove4j.version>  		<maple.version>0.2.8</maple.version> -		<specs2.version>1.12.4.1</specs2.version> +		<specs2.version>2.1</specs2.version>  		<typesafe.config.version>1.0.0</typesafe.config.version>          <!-- Other libraries properties --> @@ -66,7 +59,13 @@  	</properties> -	<distributionManagement> +    <name>Cascading and Scalding wrapper for HBase with advanced features</name> +    <groupId>parallelai</groupId> +    <artifactId>parallelai.spyglass</artifactId> +    <version>${scala.version}_2.4.0</version> +    <packaging>jar</packaging> + +    <distributionManagement>  	    <repository>  	        <id>repo</id>  	        <url>https://github.com/ParallelAI/mvn-repo/raw/master/releases</url> @@ -90,6 +89,10 @@  			<name>Con Jars</name>  			<url>http://conjars.org/repo</url>  		</repository> +        <repository> +            <id>mvnrepository</id> +            <url>http://repo1.maven.org/maven2</url> +        </repository>  	</repositories>  	<!-- Profiles --> @@ -237,7 +240,7 @@  					</includes>  				</configuration>  			</plugin> -			<!-- This plugin is not supported by Eclipse, so maybe we shouldn't be  +			<!-- This plugin is not supported by Eclipse, so maybe we shouldn't be  				using it -->  			<plugin>  				<groupId>org.scala-tools</groupId> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7dba40d..7b62c88 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -1,286 +1,286 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.Arrays; -//import java.util.HashSet; -// -//import org.apache.hadoop.hbase.client.Put; -//import org.apache.hadoop.hbase.client.Result; -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -//import org.apache.hadoop.hbase.mapred.TableOutputFormat; -//import org.apache.hadoop.hbase.util.Bytes; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; -//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; -// -//import cascading.flow.FlowProcess; -//import cascading.scheme.Scheme; -//import cascading.scheme.SinkCall; -//import cascading.scheme.SourceCall; -//import cascading.tap.Tap; -//import cascading.tuple.Fields; -//import cascading.tuple.Tuple; -//import cascading.tuple.TupleEntry; -//import cascading.util.Util; -// -///** -//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -//* with the {@HBaseRawTap} to allow for the reading and writing of data -//* to and from a HBase cluster. -//* -//* @see HBaseRawTap -//*/ -//@SuppressWarnings({ "rawtypes", "deprecation" }) -//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { -//	/** -//	 * -//	 */ -//	private static final long serialVersionUID = 6248976486883281356L; -// -//	/** Field LOG */ -//	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); -// -//	public final Fields RowKeyField = new Fields("rowkey"); -//	public final Fields RowField = new Fields("row"); -// -//	/** String familyNames */ -//	private String[] familyNames; -// -//	private boolean writeNulls = true; -// -//	/** -//	 * Constructor HBaseScheme creates a new HBaseScheme instance. -//	 * -//	 * @param keyFields -//	 *            of type Fields -//	 * @param familyName -//	 *            of type String -//	 * @param valueFields -//	 *            of type Fields -//	 */ -//	public HBaseRawScheme(String familyName) { -//		this(new String[] { familyName }); -//	} -// -//	public HBaseRawScheme(String[] familyNames) { -//		this.familyNames = familyNames; -//		setSourceFields(); -//	} -// -//	public HBaseRawScheme(String familyName, boolean writeNulls) { -//		this(new String[] { familyName }, writeNulls); -//	} -// -//	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { -//		this.familyNames = familyNames; -//		this.writeNulls = writeNulls; -//		setSourceFields(); -//	} -// -//	private void setSourceFields() { -//		Fields sourceFields = Fields.join(RowKeyField, RowField); -//		setSourceFields(sourceFields); -//	} -// -//	/** -//	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme -//	 * object. -//	 * -//	 * @return the familyNames (type String[]) of this HBaseScheme object. -//	 */ -//	public String[] getFamilyNames() { -//		HashSet<String> familyNameSet = new HashSet<String>(); -//		if (familyNames != null) { -//			for (String familyName : familyNames) { -//				familyNameSet.add(familyName); -//			} -//		} -//		return familyNameSet.toArray(new String[0]); -//	} -// -//	@Override -//	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -//		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; -// -//		sourceCall.setContext(pair); -//	} -// -//	@Override -//	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -//		sourceCall.setContext(null); -//	} -// -//	@SuppressWarnings("unchecked") -//	@Override -//	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) -//			throws IOException { -//		Tuple result = new Tuple(); -// -//		Object key = sourceCall.getContext()[0]; -//		Object value = sourceCall.getContext()[1]; -//		boolean hasNext = sourceCall.getInput().next(key, value); -//		if (!hasNext) { -//			return false; -//		} -// -//		// Skip nulls -//		if (key == null || value == null) { -//			return true; -//		} -// -//		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; -//		Result row = (Result) value; -//		result.add(keyWritable); -//		result.add(row); -//		sourceCall.getIncomingEntry().setTuple(result); -//		return true; -//	} -// -//	@SuppressWarnings("unchecked") -//	@Override -//	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { -//		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); -//		OutputCollector outputCollector = sinkCall.getOutput(); -//		Tuple key = tupleEntry.selectTuple(RowKeyField); -//		Object okey = key.getObject(0); -//		ImmutableBytesWritable keyBytes = getBytes(okey); -//		Put put = new Put(keyBytes.get()); -//		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); -//		if (null != outFields) { -//			TupleEntry values = tupleEntry.selectEntry(outFields); -//			for (int n = 0; n < values.getFields().size(); n++) { -//				Object o = values.get(n); -//				ImmutableBytesWritable valueBytes = getBytes(o); -//				Comparable field = outFields.get(n); -//				ColumnName cn = parseColumn((String) field); -//				if (null == cn.family) { -//					if (n >= familyNames.length) -//						cn.family = familyNames[familyNames.length - 1]; -//					else -//						cn.family = familyNames[n]; -//				} -//				if (null != o || writeNulls) -//					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); -//			} -//		} -//		outputCollector.collect(null, put); -//	} -// -//	private ImmutableBytesWritable getBytes(Object obj) { -//		if (null == obj) -//			return new ImmutableBytesWritable(new byte[0]); -//		if (obj instanceof ImmutableBytesWritable) -//			return (ImmutableBytesWritable) obj; -//		else if (obj instanceof String) -//			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); -//		else if (obj instanceof Long) -//			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); -//		else if (obj instanceof Integer) -//			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); -//		else if (obj instanceof Short) -//			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); -//		else if (obj instanceof Boolean) -//			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); -//		else if (obj instanceof Double) -//			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); -//		else -//			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" -//					+ obj.getClass().getName()); -//	} -// -//	private ColumnName parseColumn(String column) { -//		ColumnName ret = new ColumnName(); -//		int pos = column.indexOf(":"); -//		if (pos > 0) { -//			ret.name = column.substring(pos + 1); -//			ret.family = column.substring(0, pos); -//		} else { -//			ret.name = column; -//		} -//		return ret; -//	} -// -//	private class ColumnName { -//		String family; -//		String name; -// -//		ColumnName() { -//		} -//	} -// -//	@Override -//	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -//		conf.setOutputFormat(TableOutputFormat.class); -//		conf.setOutputKeyClass(ImmutableBytesWritable.class); -//		conf.setOutputValueClass(Put.class); -//	} -// -//	@Override -//	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, -//			JobConf conf) { -//		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, -//				ValueCopier.class); -//		if (null != familyNames) { -//			String columns = Util.join(this.familyNames, " "); -//			LOG.debug("sourcing from column families: {}", columns); -//			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); -//		} -//	} -// -//	@Override -//	public boolean equals(Object object) { -//		if (this == object) { -//			return true; -//		} -//		if (object == null || getClass() != object.getClass()) { -//			return false; -//		} -//		if (!super.equals(object)) { -//			return false; -//		} -// -//		HBaseRawScheme that = (HBaseRawScheme) object; -// -//		if (!Arrays.equals(familyNames, that.familyNames)) { -//			return false; -//		} -//		return true; -//	} -// -//	@Override -//	public int hashCode() { -//		int result = super.hashCode(); -//		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); -//		return result; -//	} -// -//	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { -// -//		public ValueCopier() { -//		} -// -//		public void copyValue(Result oldValue, Result newValue) { -//			if (null != oldValue && null != newValue) { -//				oldValue.copyFrom(newValue); -//			} -//		} -// -//	} -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; + +/** +* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction +* with the {@HBaseRawTap} to allow for the reading and writing of data +* to and from a HBase cluster. +* +* @see HBaseRawTap +*/ +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +	/** +	 * +	 */ +	private static final long serialVersionUID = 6248976486883281356L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); + +	public final Fields RowKeyField = new Fields("rowkey"); +	public final Fields RowField = new Fields("row"); + +	/** String familyNames */ +	private String[] familyNames; + +	private boolean writeNulls = true; + +	/** +	 * Constructor HBaseScheme creates a new HBaseScheme instance. +	 * +	 * @param keyFields +	 *            of type Fields +	 * @param familyName +	 *            of type String +	 * @param valueFields +	 *            of type Fields +	 */ +	public HBaseRawScheme(String familyName) { +		this(new String[] { familyName }); +	} + +	public HBaseRawScheme(String[] familyNames) { +		this.familyNames = familyNames; +		setSourceFields(); +	} + +	public HBaseRawScheme(String familyName, boolean writeNulls) { +		this(new String[] { familyName }, writeNulls); +	} + +	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +		this.familyNames = familyNames; +		this.writeNulls = writeNulls; +		setSourceFields(); +	} + +	private void setSourceFields() { +		Fields sourceFields = Fields.join(RowKeyField, RowField); +		setSourceFields(sourceFields); +	} + +	/** +	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme +	 * object. +	 * +	 * @return the familyNames (type String[]) of this HBaseScheme object. +	 */ +	public String[] getFamilyNames() { +		HashSet<String> familyNameSet = new HashSet<String>(); +		if (familyNames != null) { +			for (String familyName : familyNames) { +				familyNameSet.add(familyName); +			} +		} +		return familyNameSet.toArray(new String[0]); +	} + +	@Override +	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; + +		sourceCall.setContext(pair); +	} + +	@Override +	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		sourceCall.setContext(null); +	} + +	@SuppressWarnings("unchecked") +	@Override +	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) +			throws IOException { +		Tuple result = new Tuple(); + +		Object key = sourceCall.getContext()[0]; +		Object value = sourceCall.getContext()[1]; +		boolean hasNext = sourceCall.getInput().next(key, value); +		if (!hasNext) { +			return false; +		} + +		// Skip nulls +		if (key == null || value == null) { +			return true; +		} + +		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +		Result row = (Result) value; +		result.add(keyWritable); +		result.add(row); +		sourceCall.getIncomingEntry().setTuple(result); +		return true; +	} + +	@SuppressWarnings("unchecked") +	@Override +	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { +		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +		OutputCollector outputCollector = sinkCall.getOutput(); +		Tuple key = tupleEntry.selectTuple(RowKeyField); +		Object okey = key.getObject(0); +		ImmutableBytesWritable keyBytes = getBytes(okey); +		Put put = new Put(keyBytes.get()); +		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +		if (null != outFields) { +			TupleEntry values = tupleEntry.selectEntry(outFields); +			for (int n = 0; n < values.getFields().size(); n++) { +				Object o = values.get(n); +				ImmutableBytesWritable valueBytes = getBytes(o); +				Comparable field = outFields.get(n); +				ColumnName cn = parseColumn((String) field); +				if (null == cn.family) { +					if (n >= familyNames.length) +						cn.family = familyNames[familyNames.length - 1]; +					else +						cn.family = familyNames[n]; +				} +				if (null != o || writeNulls) +					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +			} +		} +		outputCollector.collect(null, put); +	} + +	private ImmutableBytesWritable getBytes(Object obj) { +		if (null == obj) +			return new ImmutableBytesWritable(new byte[0]); +		if (obj instanceof ImmutableBytesWritable) +			return (ImmutableBytesWritable) obj; +		else if (obj instanceof String) +			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +		else if (obj instanceof Long) +			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +		else if (obj instanceof Integer) +			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +		else if (obj instanceof Short) +			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +		else if (obj instanceof Boolean) +			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +		else if (obj instanceof Double) +			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +		else +			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +					+ obj.getClass().getName()); +	} + +	private ColumnName parseColumn(String column) { +		ColumnName ret = new ColumnName(); +		int pos = column.indexOf(":"); +		if (pos > 0) { +			ret.name = column.substring(pos + 1); +			ret.family = column.substring(0, pos); +		} else { +			ret.name = column; +		} +		return ret; +	} + +	private class ColumnName { +		String family; +		String name; + +		ColumnName() { +		} +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +		conf.setOutputFormat(TableOutputFormat.class); +		conf.setOutputKeyClass(ImmutableBytesWritable.class); +		conf.setOutputValueClass(Put.class); +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +			JobConf conf) { +		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +				ValueCopier.class); +		if (null != familyNames) { +			String columns = Util.join(this.familyNames, " "); +			LOG.debug("sourcing from column families: {}", columns); +			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +		} +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawScheme that = (HBaseRawScheme) object; + +		if (!Arrays.equals(familyNames, that.familyNames)) { +			return false; +		} +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +		return result; +	} + +	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { + +		public ValueCopier() { +		} + +		public void copyValue(Result oldValue, Result newValue) { +			if (null != oldValue && null != newValue) { +				oldValue.copyFrom(newValue); +			} +		} + +	} +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 780d3fc..5dcd57d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.UUID; -// -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.hbase.HBaseConfiguration; -//import org.apache.hadoop.hbase.HColumnDescriptor; -//import org.apache.hadoop.hbase.HTableDescriptor; -//import org.apache.hadoop.hbase.MasterNotRunningException; -//import org.apache.hadoop.hbase.ZooKeeperConnectionException; -//import org.apache.hadoop.hbase.client.HBaseAdmin; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -//import org.apache.hadoop.mapred.FileInputFormat; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import cascading.flow.FlowProcess; -//import cascading.tap.SinkMode; -//import cascading.tap.Tap; -//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -//import cascading.tuple.TupleEntryCollector; -//import cascading.tuple.TupleEntryIterator; -// -//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -// -///** -//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with -//* the {@HBaseRawScheme} to allow for the reading and writing -//* of data to and from a HBase cluster. -//*/ -//@SuppressWarnings({ "deprecation", "rawtypes" }) -//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { -//	/** -//	 * -//	 */ -//	private static final long serialVersionUID = 8019189493428493323L; -// -//	/** Field LOG */ -//	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); -// -//	private final String id = UUID.randomUUID().toString(); -// -//	/** Field SCHEME */ -//	public static final String SCHEME = "hbase"; -// -//	/** Field hBaseAdmin */ -//	private transient HBaseAdmin hBaseAdmin; -// -//	/** Field hostName */ -//	private String quorumNames; -//	/** Field tableName */ -//	private String tableName; -//	private String base64Scan; -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 */ -//	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { -//		super(HBaseFullScheme, SinkMode.UPDATE); -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 * @param sinkMode -//	 *            of type SinkMode -//	 */ -//	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -//		super(HBaseFullScheme, sinkMode); -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 */ -//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { -//		super(HBaseFullScheme, SinkMode.UPDATE); -//		this.quorumNames = quorumNames; -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 * @param sinkMode -//	 *            of type SinkMode -//	 */ -//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -//		super(HBaseFullScheme, sinkMode); -//		this.quorumNames = quorumNames; -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param quorumNames		HBase quorum -//	 * @param tableName			The name of the HBase table to read -//	 * @param HBaseFullScheme -//	 * @param base64Scan		An optional base64 encoded scan object -//	 * @param sinkMode			If REPLACE the output table will be deleted before writing to -//	 */ -//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { -//		super(HBaseFullScheme, sinkMode); -//		this.quorumNames = quorumNames; -//		this.tableName = tableName; -//		this.base64Scan = base64Scan; -//	} -// -//	/** -//	 * Method getTableName returns the tableName of this HBaseTap object. -//	 * -//	 * @return the tableName (type String) of this HBaseTap object. -//	 */ -//	public String getTableName() { -//		return tableName; -//	} -// -//	public Path getPath() { -//		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); -//	} -// -//	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { -//		if (hBaseAdmin == null) { -//			Configuration hbaseConf = HBaseConfiguration.create(conf); -//			hBaseAdmin = new HBaseAdmin(hbaseConf); -//		} -// -//		return hBaseAdmin; -//	} -// -//	@Override -//	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { -//		if (quorumNames != null) { -//			conf.set("hbase.zookeeper.quorum", quorumNames); -//		} -// -//		LOG.debug("sinking to table: {}", tableName); -// -//		if (isReplace() && conf.get("mapred.task.partition") == null) { -//			try { -//				deleteResource(conf); -// -//			} catch (IOException e) { -//				throw new RuntimeException("could not delete resource: " + e); -//			} -//		} -// -//		else if (isUpdate() || isReplace()) { -//			try { -//				createResource(conf); -//			} catch (IOException e) { -//				throw new RuntimeException(tableName + " does not exist !", e); -//			} -// -//		} -// -//		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); -//		super.sinkConfInit(process, conf); -//	} -// -//	@Override -//	public String getIdentifier() { -//		return id; -//	} -// -//	@Override -//	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) -//			throws IOException { -//		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); -//	} -// -//	@Override -//	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) -//			throws IOException { -//		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); -//		hBaseCollector.prepare(); -//		return hBaseCollector; -//	} -// -//	@Override -//	public boolean createResource(JobConf jobConf) throws IOException { -//		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); -// -//		if (hBaseAdmin.tableExists(tableName)) { -//			return true; -//		} -// -//		LOG.info("creating hbase table: {}", tableName); -// -//		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); -// -//		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); -// -//		for (String familyName : familyNames) { -//			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); -//		} -// -//		hBaseAdmin.createTable(tableDescriptor); -// -//		return true; -//	} -// -//	@Override -//	public boolean deleteResource(JobConf jobConf) throws IOException { -//		if (getHBaseAdmin(jobConf).tableExists(tableName)) { -//			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) -//				getHBaseAdmin(jobConf).disableTable(tableName); -//			getHBaseAdmin(jobConf).deleteTable(tableName); -//		} -//		return true; -//	} -// -//	@Override -//	public boolean resourceExists(JobConf jobConf) throws IOException { -//		return getHBaseAdmin(jobConf).tableExists(tableName); -//	} -// -//	@Override -//	public long getModifiedTime(JobConf jobConf) throws IOException { -//		return System.currentTimeMillis(); // currently unable to find last mod -//											// time -//											// on a table -//	} -// -//	@Override -//	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { -//		// a hack for MultiInputFormat to see that there is a child format -//		FileInputFormat.setInputPaths(conf, getPath()); -// -//		if (quorumNames != null) { -//			conf.set("hbase.zookeeper.quorum", quorumNames); -//		} -// -//		LOG.debug("sourcing from table: {}", tableName); -//		conf.set(TableInputFormat.INPUT_TABLE, tableName); -//		if (null != base64Scan) -//			conf.set(TableInputFormat.SCAN, base64Scan); -// -//		super.sourceConfInit(process, conf); -//	} -// -//	@Override -//	public boolean equals(Object object) { -//		if (this == object) { -//			return true; -//		} -//		if (object == null || getClass() != object.getClass()) { -//			return false; -//		} -//		if (!super.equals(object)) { -//			return false; -//		} -// -//		HBaseRawTap hBaseTap = (HBaseRawTap) object; -// -//		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { -//			return false; -//		} -// -//		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { -//			return false; -//		} -// -//		return true; -//	} -// -//	@Override -//	public int hashCode() { -//		int result = super.hashCode(); -//		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); -//		return result; -//	} -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** +* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +* the {@HBaseRawScheme} to allow for the reading and writing +* of data to and from a HBase cluster. +*/ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { +	/** +	 * +	 */ +	private static final long serialVersionUID = 8019189493428493323L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + +	private final String id = UUID.randomUUID().toString(); + +	/** Field SCHEME */ +	public static final String SCHEME = "hbase"; + +	/** Field hBaseAdmin */ +	private transient HBaseAdmin hBaseAdmin; + +	/** Field hostName */ +	private String quorumNames; +	/** Field tableName */ +	private String tableName; +	private String base64Scan; + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param quorumNames		HBase quorum +	 * @param tableName			The name of the HBase table to read +	 * @param HBaseFullScheme +	 * @param base64Scan		An optional base64 encoded scan object +	 * @param sinkMode			If REPLACE the output table will be deleted before writing to +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +		this.base64Scan = base64Scan; +	} + +	/** +	 * Method getTableName returns the tableName of this HBaseTap object. +	 * +	 * @return the tableName (type String) of this HBaseTap object. +	 */ +	public String getTableName() { +		return tableName; +	} + +	public Path getPath() { +		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +	} + +	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +		if (hBaseAdmin == null) { +			Configuration hbaseConf = HBaseConfiguration.create(conf); +			hBaseAdmin = new HBaseAdmin(hbaseConf); +		} + +		return hBaseAdmin; +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sinking to table: {}", tableName); + +		if (isReplace() && conf.get("mapred.task.partition") == null) { +			try { +				deleteResource(conf); + +			} catch (IOException e) { +				throw new RuntimeException("could not delete resource: " + e); +			} +		} + +		else if (isUpdate() || isReplace()) { +			try { +				createResource(conf); +			} catch (IOException e) { +				throw new RuntimeException(tableName + " does not exist !", e); +			} + +		} + +		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +		super.sinkConfInit(process, conf); +	} + +	@Override +	public String getIdentifier() { +		return id; +	} + +	@Override +	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) +			throws IOException { +		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +	} + +	@Override +	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) +			throws IOException { +		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +		hBaseCollector.prepare(); +		return hBaseCollector; +	} + +	@Override +	public boolean createResource(JobConf jobConf) throws IOException { +		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +		if (hBaseAdmin.tableExists(tableName)) { +			return true; +		} + +		LOG.info("creating hbase table: {}", tableName); + +		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + +		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + +		for (String familyName : familyNames) { +			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +		} + +		hBaseAdmin.createTable(tableDescriptor); + +		return true; +	} + +	@Override +	public boolean deleteResource(JobConf jobConf) throws IOException { +		if (getHBaseAdmin(jobConf).tableExists(tableName)) { +			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +				getHBaseAdmin(jobConf).disableTable(tableName); +			getHBaseAdmin(jobConf).deleteTable(tableName); +		} +		return true; +	} + +	@Override +	public boolean resourceExists(JobConf jobConf) throws IOException { +		return getHBaseAdmin(jobConf).tableExists(tableName); +	} + +	@Override +	public long getModifiedTime(JobConf jobConf) throws IOException { +		return System.currentTimeMillis(); // currently unable to find last mod +											// time +											// on a table +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +		// a hack for MultiInputFormat to see that there is a child format +		FileInputFormat.setInputPaths(conf, getPath()); + +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sourcing from table: {}", tableName); +		conf.set(TableInputFormat.INPUT_TABLE, tableName); +		if (null != base64Scan) +			conf.set(TableInputFormat.SCAN, base64Scan); + +		super.sourceConfInit(process, conf); +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawTap hBaseTap = (HBaseRawTap) object; + +		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +			return false; +		} + +		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +			return false; +		} + +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +		return result; +	} +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java index 1166970..3f10a04 100644 --- a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -29,6 +29,14 @@  package parallelai.spyglass.jdbc.db; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.fs.FileSystem; @@ -39,353 +47,385 @@ import org.apache.hadoop.mapred.Reporter;  import org.apache.hadoop.util.Progressable;  import org.apache.hadoop.util.StringUtils; -import com.jcraft.jsch.Logger; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -  /** - * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts - * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} - * writes <b>only the key</b> to the database with a batch SQL query. + * A OutputFormat that sends the reduce output to a SQL table. + * <p/> + * {@link DBOutputFormat} accepts <key,value> pairs, where key has a type + * extending DBWritable. Returned {@link RecordWriter} writes <b>only the + * key</b> to the database with a batch SQL query.   */ -public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { -    private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); - -    /** A RecordWriter that writes the reduce output to a SQL table */ -    protected class DBRecordWriter implements RecordWriter<K, V> { -        private Connection connection; -        private PreparedStatement insertStatement; -        private PreparedStatement updateStatement; -        private final int statementsBeforeExecute; - -        private long statementsAdded = 0; -        private long insertStatementsCurrent = 0; -        private long updateStatementsCurrent = 0; - -        protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, -            PreparedStatement updateStatement, int statementsBeforeExecute) { -            this.connection = connection; -            this.insertStatement = insertStatement; -            this.updateStatement = updateStatement; -            this.statementsBeforeExecute = statementsBeforeExecute; -        } - -        /** {@inheritDoc} */ -        public void close(Reporter reporter) throws IOException { -            executeBatch(); - -            try { -                if (insertStatement != null) { insertStatement.close(); } - -                if (updateStatement != null) { updateStatement.close(); } - -                connection.commit(); -            } catch (SQLException exception) { -                rollBack(); - -                createThrowMessage("unable to commit batch", 0, exception); -            } finally { -                try { -                    connection.close(); -                } catch (SQLException exception) { -                    throw new IOException("unable to close connection", exception); -                } -            } -        } - -        private void executeBatch() throws IOException { -            try { -                if (insertStatementsCurrent != 0) { -                    LOG.info( -                        "executing insert batch " + createBatchMessage(insertStatementsCurrent)); - -                    insertStatement.executeBatch(); -                } - -                insertStatementsCurrent = 0; -            } catch (SQLException exception) { -                rollBack(); - -                createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); -            } - -            try { -                if (updateStatementsCurrent != 0) { -                    LOG.info( -                        "executing update batch " + createBatchMessage(updateStatementsCurrent)); - -                    int[] result = updateStatement.executeBatch(); - -                    int count = 0; - -                    for (int value : result) { count += value; } - -                    if (count != updateStatementsCurrent) { -                        throw new IOException( -                            "update did not update same number of statements executed in batch, batch: " -                            + updateStatementsCurrent + " updated: " + count); -                    } -                } - -                updateStatementsCurrent = 0; -            } catch (SQLException exception) { -            	 -            	String message = exception.getMessage(); -            	if (message.indexOf("Duplicate Key") >= 0) { -            		LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); -            	} else { -                    rollBack(); - -                    createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); -            	} -            } -        } - -        private void rollBack() { -            try { -                connection.rollback(); -            } catch (SQLException sqlException) { -                LOG.warn(StringUtils.stringifyException(sqlException)); -            } -        } - -        private String createBatchMessage(long currentStatements) { -            return String -                .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); -        } - -        private void createThrowMessage(String stateMessage, long currentStatements, -            SQLException exception) throws IOException { -            String message = exception.getMessage(); - -            message = message.substring(0, Math.min(75, message.length())); - -            int messageLength = exception.getMessage().length(); -            String batchMessage = createBatchMessage(currentStatements); -            String template = "%s [msglength: %d]%s %s"; -            String errorMessage = -                String.format(template, stateMessage, messageLength, batchMessage, message); - -            LOG.error(errorMessage, exception.getNextException()); - -            throw new IOException(errorMessage, exception.getNextException()); -        } - -        /** {@inheritDoc} */ -        public synchronized void write(K key, V value) throws IOException { -            try { -                if (value == null) { -                    key.write(insertStatement); -                    insertStatement.addBatch(); -                    insertStatementsCurrent++; -                } else { -                    key.write(updateStatement); -                    updateStatement.addBatch(); -                    updateStatementsCurrent++; -                } -            } catch (SQLException exception) { -                throw new IOException("unable to add batch statement", exception); -            } - -            statementsAdded++; - -            if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } -        } -    } - -    /** -     * Constructs the query used as the prepared statement to insert data. -     * -     * @param table      the table to insert into -     * @param fieldNames the fields to insert into. If field names are unknown, supply an array of -     *                   nulls. -     */ -    protected String constructInsertQuery(String table, String[] fieldNames) { -        if (fieldNames == null) { -            throw new IllegalArgumentException("Field names may not be null"); -        } - -        StringBuilder query = new StringBuilder(); - -        query.append("INSERT INTO ").append(table); - -        if (fieldNames.length > 0 && fieldNames[0] != null) { -            query.append(" ("); - -            for (int i = 0; i < fieldNames.length; i++) { -                query.append(fieldNames[i]); - -                if (i != fieldNames.length - 1) { query.append(","); } -            } - -            query.append(")"); - -        } - -        query.append(" VALUES ("); - -        for (int i = 0; i < fieldNames.length; i++) { -            query.append("?"); - -            if (i != fieldNames.length - 1) { query.append(","); } -        } - -        query.append(")"); -         -        boolean test = true; -        if (test) { -        	query.append(" ON DUPLICATE KEY UPDATE "); -        	 -        	 -            for (int i = 1; i < fieldNames.length; i++) { - -                 -                if ( (i != 1) ) { query.append(","); } -                //if (i != fieldNames.length - 1) { query.append(","); } -                //&& (i != fieldNames.length - 1) -                query.append(fieldNames[i]); -                query.append(" = ?"); -                 -                 -            } -        } -         -        query.append(";"); -         -        LOG.info(" ===================== " + query.toString()); -        return query.toString(); -    } - -    protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { -        if (fieldNames == null) { -            throw new IllegalArgumentException("field names may not be null"); -        } - -        Set<String> updateNamesSet = new HashSet<String>(); -        Collections.addAll(updateNamesSet, updateNames); - -        StringBuilder query = new StringBuilder(); - -        query.append("UPDATE ").append(table); - -        query.append(" SET "); - -        if (fieldNames.length > 0 && fieldNames[0] != null) { -            int count = 0; - -            for (int i = 0; i < fieldNames.length; i++) { -                if (updateNamesSet.contains(fieldNames[i])) { continue; } - -                if (count != 0) { query.append(","); } - -                query.append(fieldNames[i]); -                query.append(" = ?"); - -                count++; -            } -        } - -        query.append(" WHERE "); - -        if (updateNames.length > 0 && updateNames[0] != null) { -            for (int i = 0; i < updateNames.length; i++) { -                query.append(updateNames[i]); -                query.append(" = ?"); - -                if (i != updateNames.length - 1) { query.append(" and "); } -            } -        } - -        query.append(";"); -        System.out.println("Update Query => " + query.toString()); -        return query.toString(); -    } - -    /** {@inheritDoc} */ -    public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { -    } - -    /** {@inheritDoc} */ -    public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, -        Progressable progress) throws IOException { -        DBConfiguration dbConf = new DBConfiguration(job); - -        String tableName = dbConf.getOutputTableName(); -        String[] fieldNames = dbConf.getOutputFieldNames(); -        String[] updateNames = dbConf.getOutputUpdateFieldNames(); -        int batchStatements = dbConf.getBatchStatementsNum(); - -        Connection connection = dbConf.getConnection(); - -        configureConnection(connection); - -        String sqlInsert = constructInsertQuery(tableName, fieldNames); -        PreparedStatement insertPreparedStatement; - -        try { -            insertPreparedStatement = connection.prepareStatement(sqlInsert); -            insertPreparedStatement.setEscapeProcessing(true); // should be on by default -        } catch (SQLException exception) { -            throw new IOException("unable to create statement for: " + sqlInsert, exception); -        } - -        String sqlUpdate = -            updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; -        PreparedStatement updatePreparedStatement = null; - -        try { -            updatePreparedStatement = -                sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; -        } catch (SQLException exception) { -            throw new IOException("unable to create statement for: " + sqlUpdate, exception); -        } - -        return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); -    } - -    protected void configureConnection(Connection connection) { -        setAutoCommit(connection); -    } - -    protected void setAutoCommit(Connection connection) { -        try { -            connection.setAutoCommit(false); -        } catch (Exception exception) { -            throw new RuntimeException("unable to set auto commit", exception); -        } -    } - -    /** -     * Initializes the reduce-part of the job with the appropriate output settings -     * -     * @param job                 The job -     * @param dbOutputFormatClass -     * @param tableName           The table to insert data into -     * @param fieldNames          The field names in the table. If unknown, supply the appropriate -     */ -    public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, -        String tableName, String[] fieldNames, String[] updateFields, int batchSize) { -        if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { -            job.setOutputFormat(dbOutputFormatClass); -        } - -        // writing doesn't always happen in reduce -        job.setReduceSpeculativeExecution(false); -        job.setMapSpeculativeExecution(false); - -        DBConfiguration dbConf = new DBConfiguration(job); - -        dbConf.setOutputTableName(tableName); -        dbConf.setOutputFieldNames(fieldNames); - -        if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } - -        if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } -    } +public class DBOutputFormat<K extends DBWritable, V> implements +		OutputFormat<K, V> { +	private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + +	/** A RecordWriter that writes the reduce output to a SQL table */ +	protected class DBRecordWriter implements RecordWriter<K, V> { +		private Connection connection; +		private PreparedStatement insertStatement; +		private PreparedStatement updateStatement; +		private final int statementsBeforeExecute; + +		private long statementsAdded = 0; +		private long insertStatementsCurrent = 0; +		private long updateStatementsCurrent = 0; + +		protected DBRecordWriter(Connection connection, +				PreparedStatement insertStatement, +				PreparedStatement updateStatement, int statementsBeforeExecute) { +			this.connection = connection; +			this.insertStatement = insertStatement; +			this.updateStatement = updateStatement; +			this.statementsBeforeExecute = statementsBeforeExecute; +		} + +		/** {@inheritDoc} */ +		public void close(Reporter reporter) throws IOException { +			executeBatch(); + +			try { +				if (insertStatement != null) { +					insertStatement.close(); +				} + +				if (updateStatement != null) { +					updateStatement.close(); +				} + +				connection.commit(); +			} catch (SQLException exception) { +				rollBack(); + +				createThrowMessage("unable to commit batch", 0, exception); +			} finally { +				try { +					connection.close(); +				} catch (SQLException exception) { +					throw new IOException("unable to close connection", exception); +				} +			} +		} + +		private void executeBatch() throws IOException { +			try { +				if (insertStatementsCurrent != 0) { +					LOG.info("executing insert batch " +							+ createBatchMessage(insertStatementsCurrent)); + +					insertStatement.executeBatch(); +				} + +				insertStatementsCurrent = 0; +			} catch (SQLException exception) { +				rollBack(); + +				createThrowMessage("unable to execute insert batch", +						insertStatementsCurrent, exception); +			} + +			try { +				if (updateStatementsCurrent != 0) { +					LOG.info("executing update batch " +							+ createBatchMessage(updateStatementsCurrent)); + +					int[] result = updateStatement.executeBatch(); + +					int count = 0; + +					for (int value : result) { +						count += value; +					} + +					if (count != updateStatementsCurrent) { +						throw new IOException( +								"update did not update same number of statements executed in batch, batch: " +										+ updateStatementsCurrent + " updated: " + count); +					} +				} + +				updateStatementsCurrent = 0; +			} catch (SQLException exception) { + +				String message = exception.getMessage(); +				if (message.indexOf("Duplicate Key") >= 0) { +					LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); +				} else { +					rollBack(); + +					createThrowMessage("unable to execute update batch", +							updateStatementsCurrent, exception); +				} +			} +		} + +		private void rollBack() { +			try { +				connection.rollback(); +			} catch (SQLException sqlException) { +				LOG.warn(StringUtils.stringifyException(sqlException)); +			} +		} + +		private String createBatchMessage(long currentStatements) { +			return String.format("[totstmts: %d][crntstmts: %d][batch: %d]", +					statementsAdded, currentStatements, statementsBeforeExecute); +		} + +		private void createThrowMessage(String stateMessage, +				long currentStatements, SQLException exception) throws IOException { +			String message = exception.getMessage(); + +			// message = message.substring(0, Math.min(75, message.length())); + +			int messageLength = exception.getMessage().length(); +			String batchMessage = createBatchMessage(currentStatements); +			String template = "%s [msglength: %d]%s %s"; +			String errorMessage = String.format(template, stateMessage, +					messageLength, batchMessage, message); + +			LOG.error(errorMessage, exception.getNextException()); + +			throw new IOException(errorMessage, exception.getNextException()); +		} + +		/** {@inheritDoc} */ +		public synchronized void write(K key, V value) throws IOException { +			try { +				if (value == null) { +					key.write(insertStatement); +					insertStatement.addBatch(); +					insertStatementsCurrent++; +				} else { +					key.write(updateStatement); +					updateStatement.addBatch(); +					updateStatementsCurrent++; +				} +			} catch (SQLException exception) { +				throw new IOException("unable to add batch statement", exception); +			} + +			statementsAdded++; + +			if (statementsAdded % statementsBeforeExecute == 0) { +				executeBatch(); +			} +		} +	} + +	/** +	 * Constructs the query used as the prepared statement to insert data. +	 *  +	 * @param table +	 *           the table to insert into +	 * @param fieldNames +	 *           the fields to insert into. If field names are unknown, supply an +	 *           array of nulls. +	 */ +	protected String constructInsertQuery(String table, String[] fieldNames) { +		if (fieldNames == null) { +			throw new IllegalArgumentException("Field names may not be null"); +		} + +		StringBuilder query = new StringBuilder(); + +		query.append("INSERT INTO ").append(table); + +		if (fieldNames.length > 0 && fieldNames[0] != null) { +			query.append(" ("); + +			for (int i = 0; i < fieldNames.length; i++) { +				query.append(fieldNames[i]); + +				if (i != fieldNames.length - 1) { +					query.append(","); +				} +			} + +			query.append(")"); + +		} + +		query.append(" VALUES ("); + +		for (int i = 0; i < fieldNames.length; i++) { +			query.append("?"); + +			if (i != fieldNames.length - 1) { +				query.append(","); +			} +		} + +		query.append(")"); + +		boolean test = true; +		if (test) { +			query.append(" ON DUPLICATE KEY UPDATE "); + +			for (int i = 1; i < fieldNames.length; i++) { + +				if ((i != 1)) { +					query.append(","); +				} +				// if (i != fieldNames.length - 1) { query.append(","); } +				// && (i != fieldNames.length - 1) +				query.append(fieldNames[i]); +				query.append(" = ?"); + +			} +		} + +		query.append(";"); + +		LOG.info(" ===================== " + query.toString()); +		return query.toString(); +	} + +	protected String constructUpdateQuery(String table, String[] fieldNames, +			String[] updateNames) { +		if (fieldNames == null) { +			throw new IllegalArgumentException("field names may not be null"); +		} + +		Set<String> updateNamesSet = new HashSet<String>(); +		Collections.addAll(updateNamesSet, updateNames); + +		StringBuilder query = new StringBuilder(); + +		query.append("UPDATE ").append(table); + +		query.append(" SET "); + +		if (fieldNames.length > 0 && fieldNames[0] != null) { +			int count = 0; + +			for (int i = 0; i < fieldNames.length; i++) { +				if (updateNamesSet.contains(fieldNames[i])) { +					continue; +				} + +				if (count != 0) { +					query.append(","); +				} + +				query.append(fieldNames[i]); +				query.append(" = ?"); + +				count++; +			} +		} + +		query.append(" WHERE "); + +		if (updateNames.length > 0 && updateNames[0] != null) { +			for (int i = 0; i < updateNames.length; i++) { +				query.append(updateNames[i]); +				query.append(" = ?"); + +				if (i != updateNames.length - 1) { +					query.append(" and "); +				} +			} +		} + +		query.append(";"); +		System.out.println("Update Query => " + query.toString()); +		return query.toString(); +	} + +	/** {@inheritDoc} */ +	public void checkOutputSpecs(FileSystem filesystem, JobConf job) +			throws IOException { +	} + +	/** {@inheritDoc} */ +	public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, +			JobConf job, String name, Progressable progress) throws IOException { +		DBConfiguration dbConf = new DBConfiguration(job); + +		String tableName = dbConf.getOutputTableName(); +		String[] fieldNames = dbConf.getOutputFieldNames(); +		String[] updateNames = dbConf.getOutputUpdateFieldNames(); +		int batchStatements = dbConf.getBatchStatementsNum(); + +		Connection connection = dbConf.getConnection(); + +		configureConnection(connection); + +		String sqlInsert = constructInsertQuery(tableName, fieldNames); +		PreparedStatement insertPreparedStatement; + +		try { +			insertPreparedStatement = connection.prepareStatement(sqlInsert); +			insertPreparedStatement.setEscapeProcessing(true); // should be on by +																				// default +		} catch (SQLException exception) { +			throw new IOException("unable to create statement for: " + sqlInsert, +					exception); +		} + +		String sqlUpdate = updateNames != null ? constructUpdateQuery(tableName, +				fieldNames, updateNames) : null; +		PreparedStatement updatePreparedStatement = null; + +		try { +			updatePreparedStatement = sqlUpdate != null ? connection +					.prepareStatement(sqlUpdate) : null; +		} catch (SQLException exception) { +			throw new IOException("unable to create statement for: " + sqlUpdate, +					exception); +		} + +		return new DBRecordWriter(connection, insertPreparedStatement, +				updatePreparedStatement, batchStatements); +	} + +	protected void configureConnection(Connection connection) { +		setAutoCommit(connection); +	} + +	protected void setAutoCommit(Connection connection) { +		try { +			connection.setAutoCommit(false); +		} catch (Exception exception) { +			throw new RuntimeException("unable to set auto commit", exception); +		} +	} + +	/** +	 * Initializes the reduce-part of the job with the appropriate output +	 * settings +	 *  +	 * @param job +	 *           The job +	 * @param dbOutputFormatClass +	 * @param tableName +	 *           The table to insert data into +	 * @param fieldNames +	 *           The field names in the table. If unknown, supply the appropriate +	 */ +	public static void setOutput(JobConf job, +			Class<? extends DBOutputFormat> dbOutputFormatClass, String tableName, +			String[] fieldNames, String[] updateFields, int batchSize) { +		if (dbOutputFormatClass == null) { +			job.setOutputFormat(DBOutputFormat.class); +		} else { +			job.setOutputFormat(dbOutputFormatClass); +		} + +		// writing doesn't always happen in reduce +		job.setReduceSpeculativeExecution(false); +		job.setMapSpeculativeExecution(false); + +		DBConfiguration dbConf = new DBConfiguration(job); + +		dbConf.setOutputTableName(tableName); +		dbConf.setOutputFieldNames(fieldNames); + +		if (updateFields != null) { +			dbConf.setOutputUpdateFieldNames(updateFields); +		} + +		if (batchSize != -1) { +			dbConf.setBatchStatementsNum(batchSize); +		} +	}  } diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 6216695..450a57d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -//package parallelai.spyglass.hbase -// -//import cascading.pipe.Pipe -//import cascading.pipe.assembly.Coerce -//import cascading.scheme.Scheme -//import cascading.tap.{ Tap, SinkMode } -//import cascading.tuple.Fields -//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -//import org.apache.hadoop.hbase.util.Bytes -//import scala.collection.JavaConversions._ -//import scala.collection.mutable.WrappedArray -//import com.twitter.scalding._ -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable -//import org.apache.hadoop.hbase.client.Scan -//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -//import org.apache.hadoop.hbase.util.Base64 -//import java.io.ByteArrayOutputStream -//import java.io.DataOutputStream -// -//object HBaseRawSource { -//	/** -//	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource -//	 * @param scan -//	 * @return base64 string representation -//	 */ -//	def convertScanToString(scan: Scan) = { -//		val out = new ByteArrayOutputStream(); -//		val dos = new DataOutputStream(out); -//		scan.write(dos); -//		Base64.encodeBytes(out.toByteArray()); -//	} -//} -// -// -///** -// * @author Rotem Hermon -// * -// * HBaseRawSource is a scalding source that passes the original row (Result) object to the -// * mapper for customized processing. -// * -// * @param	tableName	The name of the HBase table to read -// * @param	quorumNames	HBase quorum -// * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) -// * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written -// * @param	base64Scan	An optional base64 encoded scan object -// * @param	sinkMode	If REPLACE the output table will be deleted before writing to -// * -// */ -//class HBaseRawSource( -//	tableName: String, -//	quorumNames: String = "localhost", -//	familyNames: Array[String], -//	writeNulls: Boolean = true, -//	base64Scan: String = null, -//	sinkMode: SinkMode = null) extends Source { -// -//	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -//		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -// -//	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -//		val hBaseScheme = hdfsScheme match { -//			case hbase: HBaseRawScheme => hbase -//			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -//		} -//		mode match { -//			case hdfsMode @ Hdfs(_, _) => readOrWrite match { -//				case Read => { -//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -//						case null => SinkMode.KEEP -//						case _ => sinkMode -//					}).asInstanceOf[Tap[_, _, _]] -//				} -//				case Write => { -//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -//						case null => SinkMode.UPDATE -//						case _ => sinkMode -//					}).asInstanceOf[Tap[_, _, _]] -//				} -//			} -//			case _ => super.createTap(readOrWrite)(mode) -//		} -//	} -//} +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { +	/** +	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource +	 * @param scan +	 * @return base64 string representation +	 */ +	def convertScanToString(scan: Scan) = { +		val out = new ByteArrayOutputStream(); +		val dos = new DataOutputStream(out); +		scan.write(dos); +		Base64.encodeBytes(out.toByteArray()); +	} +} + + +/** +* @author Rotem Hermon +* +* HBaseRawSource is a scalding source that passes the original row (Result) object to the +* mapper for customized processing. +* +* @param	tableName	The name of the HBase table to read +* @param	quorumNames	HBase quorum +* @param	familyNames	Column families to get (source, if null will get all) or update to (sink) +* @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written +* @param	base64Scan	An optional base64 encoded scan object +* @param	sinkMode	If REPLACE the output table will be deleted before writing to +* +*/ +class HBaseRawSource( +	tableName: String, +	quorumNames: String = "localhost", +	familyNames: Array[String], +	writeNulls: Boolean = true, +	base64Scan: String = null, +	sinkMode: SinkMode = null) extends Source { + +	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +		val hBaseScheme = hdfsScheme match { +			case hbase: HBaseRawScheme => hbase +			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +		} +		mode match { +			case hdfsMode @ Hdfs(_, _) => readOrWrite match { +				case Read => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.KEEP +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +				case Write => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.UPDATE +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +			} +			case _ => super.createTap(readOrWrite)(mode) +		} +	} +} | 
