diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java new file mode 100644 index 0000000..02e7f7b --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java @@ -0,0 +1,111 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +public class HBaseMultiInputSplit implements InputSplit, + Comparable<HBaseMultiInputSplit>, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class); + + private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + private String regionLocation = null; + + /** default constructor */ + private HBaseMultiInputSplit() { + + } + + public HBaseMultiInputSplit(String regionLocation) { + this.regionLocation = regionLocation; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.debug("REGION GETTER : " + regionLocation); + + return this.regionLocation; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("READ ME : " + in.toString()); + + int s = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < s; i++) { + HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils + .deserialize(Bytes.readByteArray(in)); + splits.add(hbts); + } + + LOG.debug("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("WRITE : " + this); + + Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); + + for (HBaseTableSplit hbts : splits) { + Bytes.writeByteArray(out, SerializationUtils.serialize(hbts)); + } + + LOG.debug("WROTE : " + out.toString()); + } + + @Override + public String toString() { + StringBuffer str = new StringBuffer(); + str.append("HBaseMultiSplit : "); + + for (HBaseTableSplit hbt : splits) { + str.append(" [" + hbt.toString() + "]"); + } + + return str.toString(); + } + + @Override + public int compareTo(HBaseMultiInputSplit o) { + // TODO: Make this comparison better + return (splits.size() - o.splits.size()); + } + + @Override + public long getLength() throws IOException { + return splits.size(); + } + + @Override + public String[] getLocations() throws IOException { + LOG.debug("REGION ARRAY : " + regionLocation); + + return new String[] { this.regionLocation }; + } + + public void addSplit(HBaseTableSplit hbt) throws IOException { + if (hbt.getRegionLocation().equals(regionLocation)) + splits.add(hbt); + else + throw new IOException("HBaseTableSplit Region Location " + + hbt.getRegionLocation() + + " does NOT match MultiSplit Region Location " + regionLocation); + } + + public List<HBaseTableSplit> getSplits() { + return splits; + } +}
\ No newline at end of file |