diff options
| author | Chandan Rajah <chandan.rajah@gmail.com> | 2013-07-18 17:19:24 +0100 | 
|---|---|---|
| committer | Chandan Rajah <chandan.rajah@gmail.com> | 2013-07-18 17:19:24 +0100 | 
| commit | bdb0933779f63a4f4be691feae7741b4dbb96b35 (patch) | |
| tree | 2ff9bef8ff4f95a1d819672b1e5b7acd8a57cb1e /src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java | |
| parent | 1f0450297589ecbf89862faa951ea9004df6293e (diff) | |
| download | SpyGlass-bdb0933779f63a4f4be691feae7741b4dbb96b35.tar.gz SpyGlass-bdb0933779f63a4f4be691feae7741b4dbb96b35.zip | |
Added support for split grouping in order to reduce the number of mappers created for large tables
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java')
| -rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java | 392 | 
1 files changed, 202 insertions, 190 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java index a5c3bdd..87b8f58 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -12,196 +12,208 @@ import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.InputSplit; -import com.sun.tools.javac.resources.version; -  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { - -  private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); -   -  private byte [] m_tableName = null; -  private byte [] m_startRow = null; -  private byte [] m_endRow = null; -  private String m_regionLocation = null; -  private TreeSet<String> m_keyList = null; -  private SourceMode m_sourceMode = SourceMode.EMPTY; -  private boolean m_endRowInclusive = true; -  private int m_versions = 1; -  private boolean m_useSalt = false; - -  /** default constructor */ -  public HBaseTableSplit() { -    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, -      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); -  } - -  /** -   * Constructor -   * @param tableName -   * @param startRow -   * @param endRow -   * @param location -   */ -  public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, -      final String location, final SourceMode sourceMode, final boolean useSalt) { -    this.m_tableName = tableName; -    this.m_startRow = startRow; -    this.m_endRow = endRow; -    this.m_regionLocation = location; -    this.m_sourceMode = sourceMode; -    this.m_useSalt = useSalt; -  } -   -  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) { -    this.m_tableName = tableName; -    this.m_keyList = keyList; -    this.m_versions = versions; -    this.m_sourceMode = sourceMode; -    this.m_regionLocation = location; -    this.m_useSalt = useSalt; -  } - -  /** @return table name */ -  public byte [] getTableName() { -    return this.m_tableName; -  } - -  /** @return starting row key */ -  public byte [] getStartRow() { -    return this.m_startRow; -  } - -  /** @return end row key */ -  public byte [] getEndRow() { -    return this.m_endRow; -  } -   -  public boolean getEndRowInclusive() { -    return m_endRowInclusive; -  } -   -  public void setEndRowInclusive(boolean isInclusive) { -    m_endRowInclusive = isInclusive; -  } -   -  /** @return list of keys to get */ -  public TreeSet<String> getKeyList() { -    return m_keyList; -  } -   -  public int getVersions() { -	  return m_versions; -  } - -  /** @return get the source mode */ -  public SourceMode getSourceMode() { -    return m_sourceMode; -  } -   -  public boolean getUseSalt() { -    return m_useSalt; -  } - -  /** @return the region's hostname */ -  public String getRegionLocation() { -    LOG.debug("REGION GETTER : " + m_regionLocation); -     -    return this.m_regionLocation; -  } - -  public String[] getLocations() { -    LOG.debug("REGION ARRAY : " + m_regionLocation); - -    return new String[] {this.m_regionLocation}; -  } - -  @Override -  public long getLength() { -    // Not clear how to obtain this... seems to be used only for sorting splits -    return 0; -  } - -  @Override -  public void readFields(DataInput in) throws IOException { -    LOG.debug("READ ME : " + in.toString()); - -    this.m_tableName = Bytes.readByteArray(in); -    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); -    this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); -    this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); -     -    switch(this.m_sourceMode) { -      case SCAN_RANGE: -        this.m_startRow = Bytes.readByteArray(in); -        this.m_endRow = Bytes.readByteArray(in); -        this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); -        break; -         -      case GET_LIST: -    	this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); -        this.m_keyList = new TreeSet<String>(); -         -        int m = Bytes.toInt(Bytes.readByteArray(in)); -         -        for( int i = 0; i < m; i++) { -          this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); -        } -        break; -    } -     -    LOG.debug("READ and CREATED : " + this); -  } - -  @Override -  public void write(DataOutput out) throws IOException { -    LOG.debug("WRITE : " + this); - -    Bytes.writeByteArray(out, this.m_tableName); -    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); -    Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); -    Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); -     -    switch( this.m_sourceMode ) { -      case SCAN_RANGE: -        Bytes.writeByteArray(out, this.m_startRow); -        Bytes.writeByteArray(out, this.m_endRow); -        Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); -        break; -         -      case GET_LIST: -    	Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); -        Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); -         -        for( String k: this.m_keyList ) { -          Bytes.writeByteArray(out, Bytes.toBytes(k)); -        } -        break; -    } - -    LOG.debug("WROTE : " + out.toString()); -  } - -  @Override -  public String toString() { -    return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",  -        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),  -        (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt); -  } - -  @Override -  public int compareTo(HBaseTableSplit o) { -    switch(m_sourceMode) { -      case SCAN_ALL: -      case SCAN_RANGE: -        return Bytes.compareTo(getStartRow(), o.getStartRow()); -         -      case GET_LIST: -        return m_keyList.equals( o.getKeyList() ) ? 0 : -1;  -         -      default: -        return -1; -    } -     -  } +public class HBaseTableSplit implements InputSplit, +		Comparable<HBaseTableSplit>, Serializable { + +	private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); + +	private byte[] m_tableName = null; +	private byte[] m_startRow = null; +	private byte[] m_endRow = null; +	private String m_regionLocation = null; +	private TreeSet<String> m_keyList = null; +	private SourceMode m_sourceMode = SourceMode.EMPTY; +	private boolean m_endRowInclusive = true; +	private int m_versions = 1; +	private boolean m_useSalt = false; + +	/** default constructor */ +	public HBaseTableSplit() { +		this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, +				HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); +	} + +	/** +	 * Constructor +	 *  +	 * @param tableName +	 * @param startRow +	 * @param endRow +	 * @param location +	 */ +	public HBaseTableSplit(final byte[] tableName, final byte[] startRow, +			final byte[] endRow, final String location, +			final SourceMode sourceMode, final boolean useSalt) { +		this.m_tableName = tableName; +		this.m_startRow = startRow; +		this.m_endRow = endRow; +		this.m_regionLocation = location; +		this.m_sourceMode = sourceMode; +		this.m_useSalt = useSalt; +	} + +	public HBaseTableSplit(final byte[] tableName, +			final TreeSet<String> keyList, int versions, final String location, +			final SourceMode sourceMode, final boolean useSalt) { +		this.m_tableName = tableName; +		this.m_keyList = keyList; +		this.m_versions = versions; +		this.m_sourceMode = sourceMode; +		this.m_regionLocation = location; +		this.m_useSalt = useSalt; +	} + +	/** @return table name */ +	public byte[] getTableName() { +		return this.m_tableName; +	} + +	/** @return starting row key */ +	public byte[] getStartRow() { +		return this.m_startRow; +	} + +	/** @return end row key */ +	public byte[] getEndRow() { +		return this.m_endRow; +	} + +	public boolean getEndRowInclusive() { +		return m_endRowInclusive; +	} + +	public void setEndRowInclusive(boolean isInclusive) { +		m_endRowInclusive = isInclusive; +	} + +	/** @return list of keys to get */ +	public TreeSet<String> getKeyList() { +		return m_keyList; +	} + +	public int getVersions() { +		return m_versions; +	} + +	/** @return get the source mode */ +	public SourceMode getSourceMode() { +		return m_sourceMode; +	} + +	public boolean getUseSalt() { +		return m_useSalt; +	} + +	/** @return the region's hostname */ +	public String getRegionLocation() { +		LOG.debug("REGION GETTER : " + m_regionLocation); + +		return this.m_regionLocation; +	} + +	public String[] getLocations() { +		LOG.debug("REGION ARRAY : " + m_regionLocation); + +		return new String[] { this.m_regionLocation }; +	} + +	@Override +	public long getLength() { +		// Not clear how to obtain this... seems to be used only for sorting +		// splits +		return 0; +	} + +	@Override +	public void readFields(DataInput in) throws IOException { +		LOG.debug("READ ME : " + in.toString()); + +		this.m_tableName = Bytes.readByteArray(in); +		this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); +		this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes +				.readByteArray(in))); +		this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); + +		switch (this.m_sourceMode) { +			case SCAN_RANGE: +				this.m_startRow = Bytes.readByteArray(in); +				this.m_endRow = Bytes.readByteArray(in); +				this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); +			break; + +			case GET_LIST: +				this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); +				this.m_keyList = new TreeSet<String>(); + +				int m = Bytes.toInt(Bytes.readByteArray(in)); + +				for (int i = 0; i < m; i++) { +					this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); +				} +			break; +		} + +		LOG.debug("READ and CREATED : " + this); +	} + +	@Override +	public void write(DataOutput out) throws IOException { +		LOG.debug("WRITE : " + this); + +		Bytes.writeByteArray(out, this.m_tableName); +		Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); +		Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +		Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); + +		switch (this.m_sourceMode) { +			case SCAN_RANGE: +				Bytes.writeByteArray(out, this.m_startRow); +				Bytes.writeByteArray(out, this.m_endRow); +				Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); +			break; + +			case GET_LIST: +				Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); +				Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + +				for (String k : this.m_keyList) { +					Bytes.writeByteArray(out, Bytes.toBytes(k)); +				} +			break; +		} + +		LOG.debug("WROTE : " + out.toString()); +	} + +	@Override +	public String toString() { +		return String +				.format( +						"Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", +						Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, +						Bytes.toString(m_startRow), Bytes.toString(m_endRow), +						(m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, +						m_useSalt); +	} + +	@Override +	public int compareTo(HBaseTableSplit o) { +		switch (m_sourceMode) { +			case SCAN_ALL: +			case SCAN_RANGE: +				return Bytes.compareTo(getStartRow(), o.getStartRow()); + +			case GET_LIST: +				return m_keyList.equals(o.getKeyList()) ? 0 : -1; + +			case EMPTY: +				return 0; + +			default: +				return -1; +		} + +	}  }
\ No newline at end of file | 
