diff options
| author | Chandan Rajah <crajah@parallelai.com> | 2013-08-07 12:24:34 +0100 | 
|---|---|---|
| committer | Chandan Rajah <crajah@parallelai.com> | 2013-08-07 12:24:34 +0100 | 
| commit | b9d987c0d9946f8f778fbec2856305c0f20fd3f8 (patch) | |
| tree | 6b6ea20beebf556aaa47a14df03194f2bcb19a55 /src | |
| parent | 522984bfad7df3f4cd6d2c33e0dfe3e14655896f (diff) | |
| download | SpyGlass-b9d987c0d9946f8f778fbec2856305c0f20fd3f8.tar.gz SpyGlass-b9d987c0d9946f8f778fbec2856305c0f20fd3f8.zip | |
Updated CDH version to 4.3.0 and uncommented Raw Tap and Scheme
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | 572 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 622 | 
2 files changed, 597 insertions, 597 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7dba40d..7b62c88 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -1,286 +1,286 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.Arrays; -//import java.util.HashSet; -// -//import org.apache.hadoop.hbase.client.Put; -//import org.apache.hadoop.hbase.client.Result; -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -//import org.apache.hadoop.hbase.mapred.TableOutputFormat; -//import org.apache.hadoop.hbase.util.Bytes; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; -//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; -// -//import cascading.flow.FlowProcess; -//import cascading.scheme.Scheme; -//import cascading.scheme.SinkCall; -//import cascading.scheme.SourceCall; -//import cascading.tap.Tap; -//import cascading.tuple.Fields; -//import cascading.tuple.Tuple; -//import cascading.tuple.TupleEntry; -//import cascading.util.Util; -// -///** -//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -//* with the {@HBaseRawTap} to allow for the reading and writing of data -//* to and from a HBase cluster. -//* -//* @see HBaseRawTap -//*/ -//@SuppressWarnings({ "rawtypes", "deprecation" }) -//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { -//	/** -//	 * -//	 */ -//	private static final long serialVersionUID = 6248976486883281356L; -// -//	/** Field LOG */ -//	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); -// -//	public final Fields RowKeyField = new Fields("rowkey"); -//	public final Fields RowField = new Fields("row"); -// -//	/** String familyNames */ -//	private String[] familyNames; -// -//	private boolean writeNulls = true; -// -//	/** -//	 * Constructor HBaseScheme creates a new HBaseScheme instance. -//	 * -//	 * @param keyFields -//	 *            of type Fields -//	 * @param familyName -//	 *            of type String -//	 * @param valueFields -//	 *            of type Fields -//	 */ -//	public HBaseRawScheme(String familyName) { -//		this(new String[] { familyName }); -//	} -// -//	public HBaseRawScheme(String[] familyNames) { -//		this.familyNames = familyNames; -//		setSourceFields(); -//	} -// -//	public HBaseRawScheme(String familyName, boolean writeNulls) { -//		this(new String[] { familyName }, writeNulls); -//	} -// -//	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { -//		this.familyNames = familyNames; -//		this.writeNulls = writeNulls; -//		setSourceFields(); -//	} -// -//	private void setSourceFields() { -//		Fields sourceFields = Fields.join(RowKeyField, RowField); -//		setSourceFields(sourceFields); -//	} -// -//	/** -//	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme -//	 * object. -//	 * -//	 * @return the familyNames (type String[]) of this HBaseScheme object. -//	 */ -//	public String[] getFamilyNames() { -//		HashSet<String> familyNameSet = new HashSet<String>(); -//		if (familyNames != null) { -//			for (String familyName : familyNames) { -//				familyNameSet.add(familyName); -//			} -//		} -//		return familyNameSet.toArray(new String[0]); -//	} -// -//	@Override -//	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -//		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; -// -//		sourceCall.setContext(pair); -//	} -// -//	@Override -//	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -//		sourceCall.setContext(null); -//	} -// -//	@SuppressWarnings("unchecked") -//	@Override -//	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) -//			throws IOException { -//		Tuple result = new Tuple(); -// -//		Object key = sourceCall.getContext()[0]; -//		Object value = sourceCall.getContext()[1]; -//		boolean hasNext = sourceCall.getInput().next(key, value); -//		if (!hasNext) { -//			return false; -//		} -// -//		// Skip nulls -//		if (key == null || value == null) { -//			return true; -//		} -// -//		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; -//		Result row = (Result) value; -//		result.add(keyWritable); -//		result.add(row); -//		sourceCall.getIncomingEntry().setTuple(result); -//		return true; -//	} -// -//	@SuppressWarnings("unchecked") -//	@Override -//	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { -//		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); -//		OutputCollector outputCollector = sinkCall.getOutput(); -//		Tuple key = tupleEntry.selectTuple(RowKeyField); -//		Object okey = key.getObject(0); -//		ImmutableBytesWritable keyBytes = getBytes(okey); -//		Put put = new Put(keyBytes.get()); -//		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); -//		if (null != outFields) { -//			TupleEntry values = tupleEntry.selectEntry(outFields); -//			for (int n = 0; n < values.getFields().size(); n++) { -//				Object o = values.get(n); -//				ImmutableBytesWritable valueBytes = getBytes(o); -//				Comparable field = outFields.get(n); -//				ColumnName cn = parseColumn((String) field); -//				if (null == cn.family) { -//					if (n >= familyNames.length) -//						cn.family = familyNames[familyNames.length - 1]; -//					else -//						cn.family = familyNames[n]; -//				} -//				if (null != o || writeNulls) -//					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); -//			} -//		} -//		outputCollector.collect(null, put); -//	} -// -//	private ImmutableBytesWritable getBytes(Object obj) { -//		if (null == obj) -//			return new ImmutableBytesWritable(new byte[0]); -//		if (obj instanceof ImmutableBytesWritable) -//			return (ImmutableBytesWritable) obj; -//		else if (obj instanceof String) -//			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); -//		else if (obj instanceof Long) -//			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); -//		else if (obj instanceof Integer) -//			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); -//		else if (obj instanceof Short) -//			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); -//		else if (obj instanceof Boolean) -//			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); -//		else if (obj instanceof Double) -//			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); -//		else -//			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" -//					+ obj.getClass().getName()); -//	} -// -//	private ColumnName parseColumn(String column) { -//		ColumnName ret = new ColumnName(); -//		int pos = column.indexOf(":"); -//		if (pos > 0) { -//			ret.name = column.substring(pos + 1); -//			ret.family = column.substring(0, pos); -//		} else { -//			ret.name = column; -//		} -//		return ret; -//	} -// -//	private class ColumnName { -//		String family; -//		String name; -// -//		ColumnName() { -//		} -//	} -// -//	@Override -//	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -//		conf.setOutputFormat(TableOutputFormat.class); -//		conf.setOutputKeyClass(ImmutableBytesWritable.class); -//		conf.setOutputValueClass(Put.class); -//	} -// -//	@Override -//	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, -//			JobConf conf) { -//		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, -//				ValueCopier.class); -//		if (null != familyNames) { -//			String columns = Util.join(this.familyNames, " "); -//			LOG.debug("sourcing from column families: {}", columns); -//			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); -//		} -//	} -// -//	@Override -//	public boolean equals(Object object) { -//		if (this == object) { -//			return true; -//		} -//		if (object == null || getClass() != object.getClass()) { -//			return false; -//		} -//		if (!super.equals(object)) { -//			return false; -//		} -// -//		HBaseRawScheme that = (HBaseRawScheme) object; -// -//		if (!Arrays.equals(familyNames, that.familyNames)) { -//			return false; -//		} -//		return true; -//	} -// -//	@Override -//	public int hashCode() { -//		int result = super.hashCode(); -//		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); -//		return result; -//	} -// -//	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { -// -//		public ValueCopier() { -//		} -// -//		public void copyValue(Result oldValue, Result newValue) { -//			if (null != oldValue && null != newValue) { -//				oldValue.copyFrom(newValue); -//			} -//		} -// -//	} -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; + +/** +* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction +* with the {@HBaseRawTap} to allow for the reading and writing of data +* to and from a HBase cluster. +* +* @see HBaseRawTap +*/ +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +	/** +	 * +	 */ +	private static final long serialVersionUID = 6248976486883281356L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); + +	public final Fields RowKeyField = new Fields("rowkey"); +	public final Fields RowField = new Fields("row"); + +	/** String familyNames */ +	private String[] familyNames; + +	private boolean writeNulls = true; + +	/** +	 * Constructor HBaseScheme creates a new HBaseScheme instance. +	 * +	 * @param keyFields +	 *            of type Fields +	 * @param familyName +	 *            of type String +	 * @param valueFields +	 *            of type Fields +	 */ +	public HBaseRawScheme(String familyName) { +		this(new String[] { familyName }); +	} + +	public HBaseRawScheme(String[] familyNames) { +		this.familyNames = familyNames; +		setSourceFields(); +	} + +	public HBaseRawScheme(String familyName, boolean writeNulls) { +		this(new String[] { familyName }, writeNulls); +	} + +	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +		this.familyNames = familyNames; +		this.writeNulls = writeNulls; +		setSourceFields(); +	} + +	private void setSourceFields() { +		Fields sourceFields = Fields.join(RowKeyField, RowField); +		setSourceFields(sourceFields); +	} + +	/** +	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme +	 * object. +	 * +	 * @return the familyNames (type String[]) of this HBaseScheme object. +	 */ +	public String[] getFamilyNames() { +		HashSet<String> familyNameSet = new HashSet<String>(); +		if (familyNames != null) { +			for (String familyName : familyNames) { +				familyNameSet.add(familyName); +			} +		} +		return familyNameSet.toArray(new String[0]); +	} + +	@Override +	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; + +		sourceCall.setContext(pair); +	} + +	@Override +	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		sourceCall.setContext(null); +	} + +	@SuppressWarnings("unchecked") +	@Override +	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) +			throws IOException { +		Tuple result = new Tuple(); + +		Object key = sourceCall.getContext()[0]; +		Object value = sourceCall.getContext()[1]; +		boolean hasNext = sourceCall.getInput().next(key, value); +		if (!hasNext) { +			return false; +		} + +		// Skip nulls +		if (key == null || value == null) { +			return true; +		} + +		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +		Result row = (Result) value; +		result.add(keyWritable); +		result.add(row); +		sourceCall.getIncomingEntry().setTuple(result); +		return true; +	} + +	@SuppressWarnings("unchecked") +	@Override +	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { +		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +		OutputCollector outputCollector = sinkCall.getOutput(); +		Tuple key = tupleEntry.selectTuple(RowKeyField); +		Object okey = key.getObject(0); +		ImmutableBytesWritable keyBytes = getBytes(okey); +		Put put = new Put(keyBytes.get()); +		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +		if (null != outFields) { +			TupleEntry values = tupleEntry.selectEntry(outFields); +			for (int n = 0; n < values.getFields().size(); n++) { +				Object o = values.get(n); +				ImmutableBytesWritable valueBytes = getBytes(o); +				Comparable field = outFields.get(n); +				ColumnName cn = parseColumn((String) field); +				if (null == cn.family) { +					if (n >= familyNames.length) +						cn.family = familyNames[familyNames.length - 1]; +					else +						cn.family = familyNames[n]; +				} +				if (null != o || writeNulls) +					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +			} +		} +		outputCollector.collect(null, put); +	} + +	private ImmutableBytesWritable getBytes(Object obj) { +		if (null == obj) +			return new ImmutableBytesWritable(new byte[0]); +		if (obj instanceof ImmutableBytesWritable) +			return (ImmutableBytesWritable) obj; +		else if (obj instanceof String) +			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +		else if (obj instanceof Long) +			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +		else if (obj instanceof Integer) +			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +		else if (obj instanceof Short) +			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +		else if (obj instanceof Boolean) +			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +		else if (obj instanceof Double) +			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +		else +			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +					+ obj.getClass().getName()); +	} + +	private ColumnName parseColumn(String column) { +		ColumnName ret = new ColumnName(); +		int pos = column.indexOf(":"); +		if (pos > 0) { +			ret.name = column.substring(pos + 1); +			ret.family = column.substring(0, pos); +		} else { +			ret.name = column; +		} +		return ret; +	} + +	private class ColumnName { +		String family; +		String name; + +		ColumnName() { +		} +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +		conf.setOutputFormat(TableOutputFormat.class); +		conf.setOutputKeyClass(ImmutableBytesWritable.class); +		conf.setOutputValueClass(Put.class); +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +			JobConf conf) { +		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +				ValueCopier.class); +		if (null != familyNames) { +			String columns = Util.join(this.familyNames, " "); +			LOG.debug("sourcing from column families: {}", columns); +			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +		} +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawScheme that = (HBaseRawScheme) object; + +		if (!Arrays.equals(familyNames, that.familyNames)) { +			return false; +		} +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +		return result; +	} + +	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { + +		public ValueCopier() { +		} + +		public void copyValue(Result oldValue, Result newValue) { +			if (null != oldValue && null != newValue) { +				oldValue.copyFrom(newValue); +			} +		} + +	} +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 780d3fc..5dcd57d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.UUID; -// -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.hbase.HBaseConfiguration; -//import org.apache.hadoop.hbase.HColumnDescriptor; -//import org.apache.hadoop.hbase.HTableDescriptor; -//import org.apache.hadoop.hbase.MasterNotRunningException; -//import org.apache.hadoop.hbase.ZooKeeperConnectionException; -//import org.apache.hadoop.hbase.client.HBaseAdmin; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -//import org.apache.hadoop.mapred.FileInputFormat; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import cascading.flow.FlowProcess; -//import cascading.tap.SinkMode; -//import cascading.tap.Tap; -//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -//import cascading.tuple.TupleEntryCollector; -//import cascading.tuple.TupleEntryIterator; -// -//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -// -///** -//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with -//* the {@HBaseRawScheme} to allow for the reading and writing -//* of data to and from a HBase cluster. -//*/ -//@SuppressWarnings({ "deprecation", "rawtypes" }) -//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { -//	/** -//	 * -//	 */ -//	private static final long serialVersionUID = 8019189493428493323L; -// -//	/** Field LOG */ -//	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); -// -//	private final String id = UUID.randomUUID().toString(); -// -//	/** Field SCHEME */ -//	public static final String SCHEME = "hbase"; -// -//	/** Field hBaseAdmin */ -//	private transient HBaseAdmin hBaseAdmin; -// -//	/** Field hostName */ -//	private String quorumNames; -//	/** Field tableName */ -//	private String tableName; -//	private String base64Scan; -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 */ -//	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { -//		super(HBaseFullScheme, SinkMode.UPDATE); -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 * @param sinkMode -//	 *            of type SinkMode -//	 */ -//	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -//		super(HBaseFullScheme, sinkMode); -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 */ -//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { -//		super(HBaseFullScheme, SinkMode.UPDATE); -//		this.quorumNames = quorumNames; -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param tableName -//	 *            of type String -//	 * @param HBaseFullScheme -//	 *            of type HBaseFullScheme -//	 * @param sinkMode -//	 *            of type SinkMode -//	 */ -//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -//		super(HBaseFullScheme, sinkMode); -//		this.quorumNames = quorumNames; -//		this.tableName = tableName; -//	} -// -//	/** -//	 * Constructor HBaseTap creates a new HBaseTap instance. -//	 * -//	 * @param quorumNames		HBase quorum -//	 * @param tableName			The name of the HBase table to read -//	 * @param HBaseFullScheme -//	 * @param base64Scan		An optional base64 encoded scan object -//	 * @param sinkMode			If REPLACE the output table will be deleted before writing to -//	 */ -//	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { -//		super(HBaseFullScheme, sinkMode); -//		this.quorumNames = quorumNames; -//		this.tableName = tableName; -//		this.base64Scan = base64Scan; -//	} -// -//	/** -//	 * Method getTableName returns the tableName of this HBaseTap object. -//	 * -//	 * @return the tableName (type String) of this HBaseTap object. -//	 */ -//	public String getTableName() { -//		return tableName; -//	} -// -//	public Path getPath() { -//		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); -//	} -// -//	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { -//		if (hBaseAdmin == null) { -//			Configuration hbaseConf = HBaseConfiguration.create(conf); -//			hBaseAdmin = new HBaseAdmin(hbaseConf); -//		} -// -//		return hBaseAdmin; -//	} -// -//	@Override -//	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { -//		if (quorumNames != null) { -//			conf.set("hbase.zookeeper.quorum", quorumNames); -//		} -// -//		LOG.debug("sinking to table: {}", tableName); -// -//		if (isReplace() && conf.get("mapred.task.partition") == null) { -//			try { -//				deleteResource(conf); -// -//			} catch (IOException e) { -//				throw new RuntimeException("could not delete resource: " + e); -//			} -//		} -// -//		else if (isUpdate() || isReplace()) { -//			try { -//				createResource(conf); -//			} catch (IOException e) { -//				throw new RuntimeException(tableName + " does not exist !", e); -//			} -// -//		} -// -//		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); -//		super.sinkConfInit(process, conf); -//	} -// -//	@Override -//	public String getIdentifier() { -//		return id; -//	} -// -//	@Override -//	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) -//			throws IOException { -//		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); -//	} -// -//	@Override -//	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) -//			throws IOException { -//		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); -//		hBaseCollector.prepare(); -//		return hBaseCollector; -//	} -// -//	@Override -//	public boolean createResource(JobConf jobConf) throws IOException { -//		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); -// -//		if (hBaseAdmin.tableExists(tableName)) { -//			return true; -//		} -// -//		LOG.info("creating hbase table: {}", tableName); -// -//		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); -// -//		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); -// -//		for (String familyName : familyNames) { -//			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); -//		} -// -//		hBaseAdmin.createTable(tableDescriptor); -// -//		return true; -//	} -// -//	@Override -//	public boolean deleteResource(JobConf jobConf) throws IOException { -//		if (getHBaseAdmin(jobConf).tableExists(tableName)) { -//			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) -//				getHBaseAdmin(jobConf).disableTable(tableName); -//			getHBaseAdmin(jobConf).deleteTable(tableName); -//		} -//		return true; -//	} -// -//	@Override -//	public boolean resourceExists(JobConf jobConf) throws IOException { -//		return getHBaseAdmin(jobConf).tableExists(tableName); -//	} -// -//	@Override -//	public long getModifiedTime(JobConf jobConf) throws IOException { -//		return System.currentTimeMillis(); // currently unable to find last mod -//											// time -//											// on a table -//	} -// -//	@Override -//	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { -//		// a hack for MultiInputFormat to see that there is a child format -//		FileInputFormat.setInputPaths(conf, getPath()); -// -//		if (quorumNames != null) { -//			conf.set("hbase.zookeeper.quorum", quorumNames); -//		} -// -//		LOG.debug("sourcing from table: {}", tableName); -//		conf.set(TableInputFormat.INPUT_TABLE, tableName); -//		if (null != base64Scan) -//			conf.set(TableInputFormat.SCAN, base64Scan); -// -//		super.sourceConfInit(process, conf); -//	} -// -//	@Override -//	public boolean equals(Object object) { -//		if (this == object) { -//			return true; -//		} -//		if (object == null || getClass() != object.getClass()) { -//			return false; -//		} -//		if (!super.equals(object)) { -//			return false; -//		} -// -//		HBaseRawTap hBaseTap = (HBaseRawTap) object; -// -//		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { -//			return false; -//		} -// -//		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { -//			return false; -//		} -// -//		return true; -//	} -// -//	@Override -//	public int hashCode() { -//		int result = super.hashCode(); -//		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); -//		return result; -//	} -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** +* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +* the {@HBaseRawScheme} to allow for the reading and writing +* of data to and from a HBase cluster. +*/ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { +	/** +	 * +	 */ +	private static final long serialVersionUID = 8019189493428493323L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + +	private final String id = UUID.randomUUID().toString(); + +	/** Field SCHEME */ +	public static final String SCHEME = "hbase"; + +	/** Field hBaseAdmin */ +	private transient HBaseAdmin hBaseAdmin; + +	/** Field hostName */ +	private String quorumNames; +	/** Field tableName */ +	private String tableName; +	private String base64Scan; + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 * +	 * @param quorumNames		HBase quorum +	 * @param tableName			The name of the HBase table to read +	 * @param HBaseFullScheme +	 * @param base64Scan		An optional base64 encoded scan object +	 * @param sinkMode			If REPLACE the output table will be deleted before writing to +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +		this.base64Scan = base64Scan; +	} + +	/** +	 * Method getTableName returns the tableName of this HBaseTap object. +	 * +	 * @return the tableName (type String) of this HBaseTap object. +	 */ +	public String getTableName() { +		return tableName; +	} + +	public Path getPath() { +		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +	} + +	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +		if (hBaseAdmin == null) { +			Configuration hbaseConf = HBaseConfiguration.create(conf); +			hBaseAdmin = new HBaseAdmin(hbaseConf); +		} + +		return hBaseAdmin; +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sinking to table: {}", tableName); + +		if (isReplace() && conf.get("mapred.task.partition") == null) { +			try { +				deleteResource(conf); + +			} catch (IOException e) { +				throw new RuntimeException("could not delete resource: " + e); +			} +		} + +		else if (isUpdate() || isReplace()) { +			try { +				createResource(conf); +			} catch (IOException e) { +				throw new RuntimeException(tableName + " does not exist !", e); +			} + +		} + +		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +		super.sinkConfInit(process, conf); +	} + +	@Override +	public String getIdentifier() { +		return id; +	} + +	@Override +	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) +			throws IOException { +		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +	} + +	@Override +	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) +			throws IOException { +		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +		hBaseCollector.prepare(); +		return hBaseCollector; +	} + +	@Override +	public boolean createResource(JobConf jobConf) throws IOException { +		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +		if (hBaseAdmin.tableExists(tableName)) { +			return true; +		} + +		LOG.info("creating hbase table: {}", tableName); + +		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + +		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + +		for (String familyName : familyNames) { +			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +		} + +		hBaseAdmin.createTable(tableDescriptor); + +		return true; +	} + +	@Override +	public boolean deleteResource(JobConf jobConf) throws IOException { +		if (getHBaseAdmin(jobConf).tableExists(tableName)) { +			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +				getHBaseAdmin(jobConf).disableTable(tableName); +			getHBaseAdmin(jobConf).deleteTable(tableName); +		} +		return true; +	} + +	@Override +	public boolean resourceExists(JobConf jobConf) throws IOException { +		return getHBaseAdmin(jobConf).tableExists(tableName); +	} + +	@Override +	public long getModifiedTime(JobConf jobConf) throws IOException { +		return System.currentTimeMillis(); // currently unable to find last mod +											// time +											// on a table +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +		// a hack for MultiInputFormat to see that there is a child format +		FileInputFormat.setInputPaths(conf, getPath()); + +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sourcing from table: {}", tableName); +		conf.set(TableInputFormat.INPUT_TABLE, tableName); +		if (null != base64Scan) +			conf.set(TableInputFormat.SCAN, base64Scan); + +		super.sourceConfInit(process, conf); +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawTap hBaseTap = (HBaseRawTap) object; + +		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +			return false; +		} + +		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +			return false; +		} + +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +		return result; +	} +} | 
