diff options
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java | 111 | 
1 files changed, 111 insertions, 0 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java new file mode 100644 index 0000000..02e7f7b --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java @@ -0,0 +1,111 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +public class HBaseMultiInputSplit implements InputSplit, +		Comparable<HBaseMultiInputSplit>, Serializable { + +	private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class); + +	private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + +	private String regionLocation = null; + +	/** default constructor */ +	private HBaseMultiInputSplit() { + +	} + +	public HBaseMultiInputSplit(String regionLocation) { +		this.regionLocation = regionLocation; +	} + +	/** @return the region's hostname */ +	public String getRegionLocation() { +		LOG.debug("REGION GETTER : " + regionLocation); + +		return this.regionLocation; +	} + +	@Override +	public void readFields(DataInput in) throws IOException { +		LOG.debug("READ ME : " + in.toString()); + +		int s = Bytes.toInt(Bytes.readByteArray(in)); + +		for (int i = 0; i < s; i++) { +			HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils +					.deserialize(Bytes.readByteArray(in)); +			splits.add(hbts); +		} + +		LOG.debug("READ and CREATED : " + this); +	} + +	@Override +	public void write(DataOutput out) throws IOException { +		LOG.debug("WRITE : " + this); + +		Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); + +		for (HBaseTableSplit hbts : splits) { +			Bytes.writeByteArray(out, SerializationUtils.serialize(hbts)); +		} + +		LOG.debug("WROTE : " + out.toString()); +	} + +	@Override +	public String toString() { +		StringBuffer str = new StringBuffer(); +		str.append("HBaseMultiSplit : "); + +		for (HBaseTableSplit hbt : splits) { +			str.append(" [" + hbt.toString() + "]"); +		} + +		return str.toString(); +	} + +	@Override +	public int compareTo(HBaseMultiInputSplit o) { +		// TODO: Make this comparison better +		return (splits.size() - o.splits.size()); +	} + +	@Override +	public long getLength() throws IOException { +		return splits.size(); +	} + +	@Override +	public String[] getLocations() throws IOException { +		LOG.debug("REGION ARRAY : " + regionLocation); + +		return new String[] { this.regionLocation }; +	} + +	public void addSplit(HBaseTableSplit hbt) throws IOException { +		if (hbt.getRegionLocation().equals(regionLocation)) +			splits.add(hbt); +		else +			throw new IOException("HBaseTableSplit Region Location " +					+ hbt.getRegionLocation() +					+ " does NOT match MultiSplit Region Location " + regionLocation); +	} + +	public List<HBaseTableSplit> getSplits() { +		return splits; +	} +}
\ No newline at end of file | 
