diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass')
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | 286 | ||||
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 311 | 
2 files changed, 597 insertions, 0 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java new file mode 100644 index 0000000..32730d6 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; + +/** + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + *  + * @see HBaseRawTap + */ +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +	/** +	 *  +	 */ +	private static final long serialVersionUID = 6248976486883281356L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); + +	public final Fields RowKeyField = new Fields("rowkey"); +	public final Fields RowField = new Fields("row"); + +	/** String familyNames */ +	private String[] familyNames; + +	private boolean writeNulls = true; + +	/** +	 * Constructor HBaseScheme creates a new HBaseScheme instance. +	 *  +	 * @param keyFields +	 *            of type Fields +	 * @param familyName +	 *            of type String +	 * @param valueFields +	 *            of type Fields +	 */ +	public HBaseRawScheme(String familyName) { +		this(new String[] { familyName }); +	} + +	public HBaseRawScheme(String[] familyNames) { +		this.familyNames = familyNames; +		setSourceFields(); +	} + +	public HBaseRawScheme(String familyName, boolean writeNulls) { +		this(new String[] { familyName }, writeNulls); +	} + +	public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +		this.familyNames = familyNames; +		this.writeNulls = writeNulls; +		setSourceFields(); +	} + +	private void setSourceFields() { +		Fields sourceFields = Fields.join(RowKeyField, RowField); +		setSourceFields(sourceFields); +	} + +	/** +	 * Method getFamilyNames returns the set of familyNames of this HBaseScheme +	 * object. +	 *  +	 * @return the familyNames (type String[]) of this HBaseScheme object. +	 */ +	public String[] getFamilyNames() { +		HashSet<String> familyNameSet = new HashSet<String>(); +		if (familyNames != null) { +			for (String familyName : familyNames) { +				familyNameSet.add(familyName); +			} +		} +		return familyNameSet.toArray(new String[0]); +	} + +	@Override +	public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; + +		sourceCall.setContext(pair); +	} + +	@Override +	public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +		sourceCall.setContext(null); +	} + +	@SuppressWarnings("unchecked") +	@Override +	public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) +			throws IOException { +		Tuple result = new Tuple(); + +		Object key = sourceCall.getContext()[0]; +		Object value = sourceCall.getContext()[1]; +		boolean hasNext = sourceCall.getInput().next(key, value); +		if (!hasNext) { +			return false; +		} + +		// Skip nulls +		if (key == null || value == null) { +			return true; +		} + +		ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +		Result row = (Result) value; +		result.add(keyWritable); +		result.add(row); +		sourceCall.getIncomingEntry().setTuple(result); +		return true; +	} + +	@SuppressWarnings("unchecked") +	@Override +	public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { +		TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +		OutputCollector outputCollector = sinkCall.getOutput(); +		Tuple key = tupleEntry.selectTuple(RowKeyField); +		Object okey = key.getObject(0); +		ImmutableBytesWritable keyBytes = getBytes(okey); +		Put put = new Put(keyBytes.get()); +		Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +		if (null != outFields) { +			TupleEntry values = tupleEntry.selectEntry(outFields); +			for (int n = 0; n < values.getFields().size(); n++) { +				Object o = values.get(n); +				ImmutableBytesWritable valueBytes = getBytes(o); +				Comparable field = outFields.get(n); +				ColumnName cn = parseColumn((String) field); +				if (null == cn.family) { +					if (n >= familyNames.length) +						cn.family = familyNames[familyNames.length - 1]; +					else +						cn.family = familyNames[n]; +				} +				if (null != o || writeNulls) +					put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +			} +		} +		outputCollector.collect(null, put); +	} + +	private ImmutableBytesWritable getBytes(Object obj) { +		if (null == obj) +			return new ImmutableBytesWritable(new byte[0]); +		if (obj instanceof ImmutableBytesWritable) +			return (ImmutableBytesWritable) obj; +		else if (obj instanceof String) +			return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +		else if (obj instanceof Long) +			return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +		else if (obj instanceof Integer) +			return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +		else if (obj instanceof Short) +			return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +		else if (obj instanceof Boolean) +			return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +		else if (obj instanceof Double) +			return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +		else +			throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +					+ obj.getClass().getName()); +	} + +	private ColumnName parseColumn(String column) { +		ColumnName ret = new ColumnName(); +		int pos = column.indexOf(":"); +		if (pos > 0) { +			ret.name = column.substring(pos + 1); +			ret.family = column.substring(0, pos); +		} else { +			ret.name = column; +		} +		return ret; +	} + +	private class ColumnName { +		String family; +		String name; + +		ColumnName() { +		} +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +		conf.setOutputFormat(TableOutputFormat.class); +		conf.setOutputKeyClass(ImmutableBytesWritable.class); +		conf.setOutputValueClass(Put.class); +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +			JobConf conf) { +		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +				ValueCopier.class); +		if (null != familyNames) { +			String columns = Util.join(this.familyNames, " "); +			LOG.debug("sourcing from column families: {}", columns); +			conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +		} +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawScheme that = (HBaseRawScheme) object; + +		if (!Arrays.equals(familyNames, that.familyNames)) { +			return false; +		} +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +		return result; +	} + +	public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { + +		public ValueCopier() { +		} + +		public void copyValue(Result oldValue, Result newValue) { +			if (null != oldValue && null != newValue) { +				oldValue.copyFrom(newValue); +			} +		} + +	} +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java new file mode 100644 index 0000000..738fd51 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -0,0 +1,311 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** + * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with + * the {@HBaseRawScheme} to allow for the reading and writing + * of data to and from a HBase cluster. + */ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { +	/** +	 *  +	 */ +	private static final long serialVersionUID = 8019189493428493323L; + +	/** Field LOG */ +	private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + +	private final String id = UUID.randomUUID().toString(); + +	/** Field SCHEME */ +	public static final String SCHEME = "hbase"; + +	/** Field hBaseAdmin */ +	private transient HBaseAdmin hBaseAdmin; + +	/** Field hostName */ +	private String quorumNames; +	/** Field tableName */ +	private String tableName; +	private String base64Scan; + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +		super(HBaseFullScheme, SinkMode.UPDATE); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param tableName +	 *            of type String +	 * @param HBaseFullScheme +	 *            of type HBaseFullScheme +	 * @param sinkMode +	 *            of type SinkMode +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +	} + +	/** +	 * Constructor HBaseTap creates a new HBaseTap instance. +	 *  +	 * @param quorumNames +	 * @param tableName +	 * @param HBaseFullScheme +	 * @param base64Scan +	 * @param sinkMode +	 */ +	public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +		super(HBaseFullScheme, sinkMode); +		this.quorumNames = quorumNames; +		this.tableName = tableName; +		this.base64Scan = base64Scan; +	} + +	/** +	 * Method getTableName returns the tableName of this HBaseTap object. +	 *  +	 * @return the tableName (type String) of this HBaseTap object. +	 */ +	public String getTableName() { +		return tableName; +	} + +	public Path getPath() { +		return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +	} + +	protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +		if (hBaseAdmin == null) { +			Configuration hbaseConf = HBaseConfiguration.create(conf); +			hBaseAdmin = new HBaseAdmin(hbaseConf); +		} + +		return hBaseAdmin; +	} + +	@Override +	public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sinking to table: {}", tableName); + +		if (isReplace() && conf.get("mapred.task.partition") == null) { +			try { +				deleteResource(conf); + +			} catch (IOException e) { +				throw new RuntimeException("could not delete resource: " + e); +			} +		} + +		else if (isUpdate() || isReplace()) { +			try { +				createResource(conf); +			} catch (IOException e) { +				throw new RuntimeException(tableName + " does not exist !", e); +			} + +		} + +		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +		super.sinkConfInit(process, conf); +	} + +	@Override +	public String getIdentifier() { +		return id; +	} + +	@Override +	public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) +			throws IOException { +		return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +	} + +	@Override +	public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) +			throws IOException { +		HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +		hBaseCollector.prepare(); +		return hBaseCollector; +	} + +	@Override +	public boolean createResource(JobConf jobConf) throws IOException { +		HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +		if (hBaseAdmin.tableExists(tableName)) { +			return true; +		} + +		LOG.info("creating hbase table: {}", tableName); + +		HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + +		String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + +		for (String familyName : familyNames) { +			tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +		} + +		hBaseAdmin.createTable(tableDescriptor); + +		return true; +	} + +	@Override +	public boolean deleteResource(JobConf jobConf) throws IOException { +		if (getHBaseAdmin(jobConf).tableExists(tableName)) { +			if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +				getHBaseAdmin(jobConf).disableTable(tableName); +			getHBaseAdmin(jobConf).deleteTable(tableName); +		} +		return true; +	} + +	@Override +	public boolean resourceExists(JobConf jobConf) throws IOException { +		return getHBaseAdmin(jobConf).tableExists(tableName); +	} + +	@Override +	public long getModifiedTime(JobConf jobConf) throws IOException { +		return System.currentTimeMillis(); // currently unable to find last mod +											// time +											// on a table +	} + +	@Override +	public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +		// a hack for MultiInputFormat to see that there is a child format +		FileInputFormat.setInputPaths(conf, getPath()); + +		if (quorumNames != null) { +			conf.set("hbase.zookeeper.quorum", quorumNames); +		} + +		LOG.debug("sourcing from table: {}", tableName); +		conf.set(TableInputFormat.INPUT_TABLE, tableName); +		if (null != base64Scan) +			conf.set(TableInputFormat.SCAN, base64Scan); +			 +		super.sourceConfInit(process, conf); +	} + +	@Override +	public boolean equals(Object object) { +		if (this == object) { +			return true; +		} +		if (object == null || getClass() != object.getClass()) { +			return false; +		} +		if (!super.equals(object)) { +			return false; +		} + +		HBaseRawTap hBaseTap = (HBaseRawTap) object; + +		if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +			return false; +		} + +		if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +			return false; +		} + +		return true; +	} + +	@Override +	public int hashCode() { +		int result = super.hashCode(); +		result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +		return result; +	} +} | 
