path: root/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
diff options
authorChandan Rajah <chandan.rajah@gmail.com>2013-07-18 17:19:24 +0100
committerChandan Rajah <chandan.rajah@gmail.com>2013-07-18 17:19:24 +0100
commitbdb0933779f63a4f4be691feae7741b4dbb96b35 (patch)
tree2ff9bef8ff4f95a1d819672b1e5b7acd8a57cb1e /src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
parent1f0450297589ecbf89862faa951ea9004df6293e (diff)
Added support for split grouping in order to reduce the number of mappers created for large tables
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java')
1 files changed, 202 insertions, 190 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
index a5c3bdd..87b8f58 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
@@ -12,196 +12,208 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.InputSplit;
-import com.sun.tools.javac.resources.version;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable {
- private final Log LOG = LogFactory.getLog(HBaseTableSplit.class);
- private byte [] m_tableName = null;
- private byte [] m_startRow = null;
- private byte [] m_endRow = null;
- private String m_regionLocation = null;
- private TreeSet<String> m_keyList = null;
- private SourceMode m_sourceMode = SourceMode.EMPTY;
- private boolean m_endRowInclusive = true;
- private int m_versions = 1;
- private boolean m_useSalt = false;
- /** default constructor */
- public HBaseTableSplit() {
- this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);
- }
- /**
- * Constructor
- * @param tableName
- * @param startRow
- * @param endRow
- * @param location
- */
- public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow,
- final String location, final SourceMode sourceMode, final boolean useSalt) {
- this.m_tableName = tableName;
- this.m_startRow = startRow;
- this.m_endRow = endRow;
- this.m_regionLocation = location;
- this.m_sourceMode = sourceMode;
- this.m_useSalt = useSalt;
- }
- public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) {
- this.m_tableName = tableName;
- this.m_keyList = keyList;
- this.m_versions = versions;
- this.m_sourceMode = sourceMode;
- this.m_regionLocation = location;
- this.m_useSalt = useSalt;
- }
- /** @return table name */
- public byte [] getTableName() {
- return this.m_tableName;
- }
- /** @return starting row key */
- public byte [] getStartRow() {
- return this.m_startRow;
- }
- /** @return end row key */
- public byte [] getEndRow() {
- return this.m_endRow;
- }
- public boolean getEndRowInclusive() {
- return m_endRowInclusive;
- }
- public void setEndRowInclusive(boolean isInclusive) {
- m_endRowInclusive = isInclusive;
- }
- /** @return list of keys to get */
- public TreeSet<String> getKeyList() {
- return m_keyList;
- }
- public int getVersions() {
- return m_versions;
- }
- /** @return get the source mode */
- public SourceMode getSourceMode() {
- return m_sourceMode;
- }
- public boolean getUseSalt() {
- return m_useSalt;
- }
- /** @return the region's hostname */
- public String getRegionLocation() {
- LOG.debug("REGION GETTER : " + m_regionLocation);
- return this.m_regionLocation;
- }
- public String[] getLocations() {
- LOG.debug("REGION ARRAY : " + m_regionLocation);
- return new String[] {this.m_regionLocation};
- }
- @Override
- public long getLength() {
- // Not clear how to obtain this... seems to be used only for sorting splits
- return 0;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- LOG.debug("READ ME : " + in.toString());
- this.m_tableName = Bytes.readByteArray(in);
- this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
- this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in)));
- this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
- switch(this.m_sourceMode) {
- case SCAN_RANGE:
- this.m_startRow = Bytes.readByteArray(in);
- this.m_endRow = Bytes.readByteArray(in);
- this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
- break;
- case GET_LIST:
- this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
- this.m_keyList = new TreeSet<String>();
- int m = Bytes.toInt(Bytes.readByteArray(in));
- for( int i = 0; i < m; i++) {
- this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
- }
- break;
- }
- LOG.debug("READ and CREATED : " + this);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- LOG.debug("WRITE : " + this);
- Bytes.writeByteArray(out, this.m_tableName);
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
- switch( this.m_sourceMode ) {
- case SCAN_RANGE:
- Bytes.writeByteArray(out, this.m_startRow);
- Bytes.writeByteArray(out, this.m_endRow);
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
- break;
- case GET_LIST:
- Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
- for( String k: this.m_keyList ) {
- Bytes.writeByteArray(out, Bytes.toBytes(k));
- }
- break;
- }
- LOG.debug("WROTE : " + out.toString());
- }
- @Override
- public String toString() {
- return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
- Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),
- (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt);
- }
- @Override
- public int compareTo(HBaseTableSplit o) {
- switch(m_sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- return Bytes.compareTo(getStartRow(), o.getStartRow());
- case GET_LIST:
- return m_keyList.equals( o.getKeyList() ) ? 0 : -1;
- default:
- return -1;
- }
- }
+public class HBaseTableSplit implements InputSplit,
+ Comparable<HBaseTableSplit>, Serializable {
+ private final Log LOG = LogFactory.getLog(HBaseTableSplit.class);
+ private byte[] m_tableName = null;
+ private byte[] m_startRow = null;
+ private byte[] m_endRow = null;
+ private String m_regionLocation = null;
+ private TreeSet<String> m_keyList = null;
+ private SourceMode m_sourceMode = SourceMode.EMPTY;
+ private boolean m_endRowInclusive = true;
+ private int m_versions = 1;
+ private boolean m_useSalt = false;
+ /** default constructor */
+ public HBaseTableSplit() {
+ this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);
+ }
+ /**
+ * Constructor
+ *
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ * @param location
+ */
+ public HBaseTableSplit(final byte[] tableName, final byte[] startRow,
+ final byte[] endRow, final String location,
+ final SourceMode sourceMode, final boolean useSalt) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_regionLocation = location;
+ this.m_sourceMode = sourceMode;
+ this.m_useSalt = useSalt;
+ }
+ public HBaseTableSplit(final byte[] tableName,
+ final TreeSet<String> keyList, int versions, final String location,
+ final SourceMode sourceMode, final boolean useSalt) {
+ this.m_tableName = tableName;
+ this.m_keyList = keyList;
+ this.m_versions = versions;
+ this.m_sourceMode = sourceMode;
+ this.m_regionLocation = location;
+ this.m_useSalt = useSalt;
+ }
+ /** @return table name */
+ public byte[] getTableName() {
+ return this.m_tableName;
+ }
+ /** @return starting row key */
+ public byte[] getStartRow() {
+ return this.m_startRow;
+ }
+ /** @return end row key */
+ public byte[] getEndRow() {
+ return this.m_endRow;
+ }
+ public boolean getEndRowInclusive() {
+ return m_endRowInclusive;
+ }
+ public void setEndRowInclusive(boolean isInclusive) {
+ m_endRowInclusive = isInclusive;
+ }
+ /** @return list of keys to get */
+ public TreeSet<String> getKeyList() {
+ return m_keyList;
+ }
+ public int getVersions() {
+ return m_versions;
+ }
+ /** @return get the source mode */
+ public SourceMode getSourceMode() {
+ return m_sourceMode;
+ }
+ public boolean getUseSalt() {
+ return m_useSalt;
+ }
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ LOG.debug("REGION GETTER : " + m_regionLocation);
+ return this.m_regionLocation;
+ }
+ public String[] getLocations() {
+ LOG.debug("REGION ARRAY : " + m_regionLocation);
+ return new String[] { this.m_regionLocation };
+ }
+ @Override
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting
+ // splits
+ return 0;
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ LOG.debug("READ ME : " + in.toString());
+ this.m_tableName = Bytes.readByteArray(in);
+ this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes
+ .readByteArray(in)));
+ this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
+ switch (this.m_sourceMode) {
+ case SCAN_RANGE:
+ this.m_startRow = Bytes.readByteArray(in);
+ this.m_endRow = Bytes.readByteArray(in);
+ this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
+ break;
+ case GET_LIST:
+ this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
+ this.m_keyList = new TreeSet<String>();
+ int m = Bytes.toInt(Bytes.readByteArray(in));
+ for (int i = 0; i < m; i++) {
+ this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
+ }
+ break;
+ }
+ LOG.debug("READ and CREATED : " + this);
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ LOG.debug("WRITE : " + this);
+ Bytes.writeByteArray(out, this.m_tableName);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
+ switch (this.m_sourceMode) {
+ case SCAN_RANGE:
+ Bytes.writeByteArray(out, this.m_startRow);
+ Bytes.writeByteArray(out, this.m_endRow);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
+ break;
+ case GET_LIST:
+ Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
+ for (String k : this.m_keyList) {
+ Bytes.writeByteArray(out, Bytes.toBytes(k));
+ }
+ break;
+ }
+ LOG.debug("WROTE : " + out.toString());
+ }
+ @Override
+ public String toString() {
+ return String
+ .format(
+ "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_sourceMode,
+ Bytes.toString(m_startRow), Bytes.toString(m_endRow),
+ (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions,
+ m_useSalt);
+ }
+ @Override
+ public int compareTo(HBaseTableSplit o) {
+ switch (m_sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ return Bytes.compareTo(getStartRow(), o.getStartRow());
+ case GET_LIST:
+ return m_keyList.equals(o.getKeyList()) ? 0 : -1;
+ case EMPTY:
+ return 0;
+ default:
+ return -1;
+ }
+ }
} \ No newline at end of file