diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java new file mode 100644 index 0000000..1d48e1d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -0,0 +1,190 @@ +package parallelai.spyglass.hbase; + +import java.awt.event.KeyListener; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); + + private byte [] m_tableName = null; + private byte [] m_startRow = null; + private byte [] m_endRow = null; + private String m_regionLocation = null; + private TreeSet<String> m_keyList = null; + private SourceMode m_sourceMode = SourceMode.EMPTY; + private boolean m_endRowInclusive = true; + + /** default constructor */ + public HBaseTableSplit() { + this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); + } + + /** + * Constructor + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, + final String location, final SourceMode sourceMode) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + this.m_sourceMode = sourceMode; + } + + public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { + this.m_tableName = tableName; + this.m_keyList = keyList; + this.m_sourceMode = sourceMode; + this.m_regionLocation = location; + } + + /** @return table name */ + public byte [] getTableName() { + return this.m_tableName; + } + + /** @return starting row key */ + public byte [] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte [] getEndRow() { + return this.m_endRow; + } + + public boolean getEndRowInclusive() { + return m_endRowInclusive; + } + + public void setEndRowInclusive(boolean isInclusive) { + m_endRowInclusive = isInclusive; + } + + /** @return list of keys to get */ + public TreeSet<String> getKeyList() { + return m_keyList; + } + + /** @return get the source mode */ + public SourceMode getSourceMode() { + return m_sourceMode; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.info("REGION GETTER : " + m_regionLocation); + + return this.m_regionLocation; + } + + public String[] getLocations() { + LOG.info("REGION ARRAY : " + m_regionLocation); + + return new String[] {this.m_regionLocation}; + } + + @Override + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting splits + return 0; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.info("READ ME : " + in.toString()); + + this.m_tableName = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); + + switch(this.m_sourceMode) { + case SCAN_RANGE: + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); + break; + + case GET_LIST: + this.m_keyList = new TreeSet<String>(); + + int m = Bytes.toInt(Bytes.readByteArray(in)); + + for( int i = 0; i < m; i++) { + this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); + } + break; + } + + LOG.info("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.info("WRITE : " + this); + + Bytes.writeByteArray(out, this.m_tableName); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); + + switch( this.m_sourceMode ) { + case SCAN_RANGE: + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); + break; + + case GET_LIST: + Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + + for( String k: this.m_keyList ) { + Bytes.writeByteArray(out, Bytes.toBytes(k)); + } + break; + } + + LOG.info("WROTE : " + out.toString()); + } + + @Override + public String toString() { + return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); + } + + @Override + public int compareTo(HBaseTableSplit o) { + switch(m_sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + return Bytes.compareTo(getStartRow(), o.getStartRow()); + + case GET_LIST: + return m_keyList.equals( o.getKeyList() ) ? 0 : -1; + + default: + return -1; + } + + } +}
\ No newline at end of file |