diff options
6 files changed, 1497 insertions, 1283 deletions
| @@ -12,7 +12,7 @@  	<name>Cascading and Scalding wrapper for HBase with advanced features</name>  	<groupId>parallelai</groupId>  	<artifactId>parallelai.spyglass</artifactId> -	<version>2.9.3_2.2.0</version> +	<version>2.9.3_3.0.0-SNAPSHOT</version>  	<packaging>jar</packaging>  	<properties> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index aabdc5e..8e121bc 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -6,7 +6,6 @@ import java.util.ArrayList;  import java.util.Collection;  import java.util.HashMap;  import java.util.List; -import java.util.NavigableMap;  import java.util.Set;  import java.util.TreeSet;  import java.util.UUID; @@ -17,10 +16,8 @@ import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo;  import org.apache.hadoop.hbase.HRegionLocation;  import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.ServerName;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.filter.Filter; @@ -40,519 +37,609 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseInputFormat -  implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { -   -  private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); - -  private final String id = UUID.randomUUID().toString(); - -  private byte [][] inputColumns; -  private HTable table; -//  private HBaseRecordReader tableRecordReader; -  private Filter rowFilter; -//  private String tableName = ""; - -  private HashMap<InetAddress, String> reverseDNSCacheMap = -    new HashMap<InetAddress, String>(); -   -  private String nameServer = null; -   -//  private Scan scan = null; - -   -  @SuppressWarnings("deprecation") -  @Override -  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { -    if (this.table == null) { -      throw new IOException("No table was provided"); -    } -     -    if (this.inputColumns == null || this.inputColumns.length == 0) { -      throw new IOException("Expecting at least one column"); -    } -     -    Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); - -    if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { -      HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - -      if (null == regLoc) { -        throw new IOException("Expecting at least one region."); -      } - -      List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); -      HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc -              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); -       -      splits.add(split); -       -      return splits.toArray(new HBaseTableSplit[splits.size()]); -    } -     -    if( keys.getSecond() == null || keys.getSecond().length == 0) { -      throw new IOException("Expecting at least one region."); -    } -     -    if( keys.getFirst().length != keys.getSecond().length ) { -      throw new IOException("Regions for start and end key do not match"); -    } -     -    byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; -    byte[] maxKey = keys.getSecond()[0]; -     -    LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); - -    byte [][] regStartKeys = keys.getFirst(); -    byte [][] regStopKeys = keys.getSecond(); -    String [] regions = new String[regStartKeys.length]; -     -    for( int i = 0; i < regStartKeys.length; i++ ) { -      minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey; -      maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey; -       -      HServerAddress regionServerAddress =  -          table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); -        InetAddress regionAddress = -          regionServerAddress.getInetSocketAddress().getAddress(); -        String regionLocation; -        try { -          regionLocation = reverseDNS(regionAddress); -        } catch (NamingException e) { -          LOG.error("Cannot resolve the host name for " + regionAddress + -              " because of " + e); -          regionLocation = regionServerAddress.getHostname(); -        } - -//       HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); -//      InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); -// -//      String regionLocation; -// -//      try { -//        regionLocation = reverseDNS(regionAddress); -//      } catch (NamingException e) { -//        LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); -//        regionLocation = regionServerAddress.getHostname(); -//      } - -//      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname(); -       -      LOG.debug( "***** " + regionLocation ); -       -      if( regionLocation == null || regionLocation.length() == 0 ) -        throw new IOException( "The region info for regiosn " + i + " is null or empty"); - -      regions[i] = regionLocation; -       -      LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); -    } -     -    byte[] startRow = HConstants.EMPTY_START_ROW; -    byte[] stopRow = HConstants.EMPTY_END_ROW; -     -    LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); -     -    LOG.debug("SOURCE MODE is : " + sourceMode);     - -    switch( sourceMode ) { -    case SCAN_ALL: -      startRow = HConstants.EMPTY_START_ROW; -      stopRow = HConstants.EMPTY_END_ROW; - -      LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); -      break; -       -    case SCAN_RANGE: -      startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; -      stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; - -      LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); -      break; -    } -     -    switch( sourceMode ) { -      case EMPTY: -      case SCAN_ALL: -      case SCAN_RANGE: -      { -//        startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow; -//        stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow; -         -        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); -         -        if( ! useSalt ) { -           -          List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); -           -          int maxRegions = validRegions.size(); -          int currentRegion = 1; -           -          for( HRegionLocation cRegion : validRegions ) { -            byte [] rStart = cRegion.getRegionInfo().getStartKey(); -            byte [] rStop = cRegion.getRegionInfo().getEndKey(); -             -            HServerAddress regionServerAddress = cRegion.getServerAddress(); -              InetAddress regionAddress = -                regionServerAddress.getInetSocketAddress().getAddress(); -              String regionLocation; -              try { -                regionLocation = reverseDNS(regionAddress); -              } catch (NamingException e) { -                LOG.error("Cannot resolve the host name for " + regionAddress + -                    " because of " + e); -                regionLocation = regionServerAddress.getHostname(); -              } -               -              byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); -              byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  -               -              LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",  -                  (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), -                      (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), -                      rStop.length -                      )); -             -            HBaseTableSplit split = new HBaseTableSplit( -                table.getTableName(), -                sStart, -                sStop, -                regionLocation, -                SourceMode.SCAN_RANGE, useSalt -             ); -             -            split.setEndRowInclusive( currentRegion == maxRegions ); -   -            currentRegion ++; -                     -             LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  -                 Bytes.toString(startRow), Bytes.toString(stopRow), -                 Bytes.toString(rStart), Bytes.toString(rStop), -                 Bytes.toString(sStart),  -                 Bytes.toString(sStop),  -                 cRegion.getHostnamePort(), split) ); -             -             splits.add(split); -          } -        } else { -          LOG.debug("Using SALT : " + useSalt ); -           -          // Will return the start and the stop key with all possible prefixes. -          for( int i = 0; i < regions.length; i++ ) { -        	Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); -        	 -            for( Pair<byte[], byte[]> pair : intervals ) { -              LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); -               -              HBaseTableSplit split = new HBaseTableSplit( -                  table.getTableName(), -                  pair.getFirst(), -                  pair.getSecond(), -                  regions[i], -                  SourceMode.SCAN_RANGE, useSalt -               ); - -              split.setEndRowInclusive(true); -              splits.add(split); -            } -          } -        } -         -        LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); -        for( HBaseTableSplit s: splits) { -            LOG.info("RETURNED SPLITS: split -> " + s); -        } - -        return splits.toArray(new HBaseTableSplit[splits.size()]); -      }  -         -      case GET_LIST: -      { -//        if( keyList == null || keyList.size() == 0 ) { -        if( keyList == null ) { -          throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); -        }  -         -        if( useSalt ) { -          TreeSet<String> tempKeyList = new TreeSet<String>(); -           -          for(String key: keyList) { -            tempKeyList.add(HBaseSalter.addSaltPrefix(key)); -          } -           -          keyList = tempKeyList; -        } -         -        LOG.debug("".format("Splitting Key List (%s)", keyList)); -         -        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); -         -        for (int i = 0; i < keys.getFirst().length; i++) { - -          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -            continue; -          } -           -          LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); - -          Set<String> regionsSubSet = null; -           -          if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { -            LOG.debug("REGION start is empty"); -            LOG.debug("REGION stop is empty"); -            regionsSubSet = keyList; -          } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { -            LOG.debug("REGION start is empty"); -            regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); -          } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { -            LOG.debug("REGION stop is empty"); -            regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); -          } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { -            regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); -          } else { -            throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",  -                regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); -          } -           -          if( regionsSubSet == null || regionsSubSet.size() == 0) { -            LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null"); -             -            continue; -          } - -          TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); - -          LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); -             -          HBaseTableSplit split = new HBaseTableSplit( -              table.getTableName(), regionKeyList, versions, -              regions[i],  -              SourceMode.GET_LIST, useSalt); -          splits.add(split); -        } -           -        LOG.debug("RETURNED SPLITS: split -> " + splits); - -        return splits.toArray(new HBaseTableSplit[splits.size()]); -      }  - -      default: -        throw new IOException("Unknown source Mode : " + sourceMode ); -    } -  } -   -  private String reverseDNS(InetAddress ipAddress) throws NamingException { -    String hostName = this.reverseDNSCacheMap.get(ipAddress); -    if (hostName == null) { -      hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); -      this.reverseDNSCacheMap.put(ipAddress, hostName); -    } -    return hostName; -  } - - -  @Override -  public RecordReader<ImmutableBytesWritable, Result> getRecordReader( -      InputSplit split, JobConf job, Reporter reporter) throws IOException { -     -    if( ! (split instanceof HBaseTableSplit ) ) -      throw new IOException("Table Split is not type HBaseTableSplit"); - -    HBaseTableSplit tSplit = (HBaseTableSplit) split; -     -    HBaseRecordReader trr = new HBaseRecordReader(); - -    switch( tSplit.getSourceMode() ) { -      case SCAN_ALL: -      case SCAN_RANGE: -      { -        LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); -         -        trr.setStartRow(tSplit.getStartRow()); -        trr.setEndRow(tSplit.getEndRow()); -        trr.setEndRowInclusive(tSplit.getEndRowInclusive()); -        trr.setUseSalt(useSalt); -      } -       -      break; -       -      case GET_LIST: -      { -        LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); -         -        trr.setKeyList(tSplit.getKeyList()); -        trr.setVersions(tSplit.getVersions()); -        trr.setUseSalt(useSalt); -      } -       -      break; -       -      default: -        throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() ); -    } -     -    trr.setSourceMode(tSplit.getSourceMode()); -    trr.setHTable(this.table); -    trr.setInputColumns(this.inputColumns); -    trr.setRowFilter(this.rowFilter); - -    trr.init(); - -    return trr; -  } -   -   -   -  /* Configuration Section */ - -  /** -   * space delimited list of columns -   */ -  public static final String COLUMN_LIST = "hbase.tablecolumns"; - -  /** -   * Use this jobconf param to specify the input table -   */ -  private static final String INPUT_TABLE = "hbase.inputtable"; - -  private String startKey = null; -  private String stopKey = null; -   -  private SourceMode sourceMode = SourceMode.EMPTY; -  private TreeSet<String> keyList = null; -  private int versions = 1; -  private boolean useSalt = false; -  private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; -   -  public void configure(JobConf job) { -    String tableName = getTableName(job); -    String colArg = job.get(COLUMN_LIST); -    String[] colNames = colArg.split(" "); -    byte [][] m_cols = new byte[colNames.length][]; -    for (int i = 0; i < m_cols.length; i++) { -      m_cols[i] = Bytes.toBytes(colNames[i]); -    } -    setInputColumns(m_cols); -     -    try { -      setHTable(new HTable(HBaseConfiguration.create(job), tableName)); -    } catch (Exception e) { -      LOG.error( "************* Table could not be created" ); -      LOG.error(StringUtils.stringifyException(e)); -    } -     -    LOG.debug("Entered : " + this.getClass() + " : configure()" ); -     -    useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); -    prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); -     -    sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; -     -    LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",  -        String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); - -    switch( sourceMode ) { -      case SCAN_RANGE: -        LOG.info("HIT SCAN_RANGE"); -         -        startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) ); -        stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) ); - -        LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) ); -        break; -         -      case GET_LIST: -        LOG.info("HIT GET_LIST"); -         -        Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); -        keyList = new TreeSet<String> (keys); - -        versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); -         -        LOG.debug( "GOT KEY LIST : " + keys ); -        LOG.debug(String.format("SETTING key list (%s)", keyList) ); - -        break; -         -      case EMPTY: -        LOG.info("HIT EMPTY"); -         -        sourceMode = SourceMode.SCAN_ALL; -        break; -       -      default: -        LOG.info("HIT DEFAULT"); -         -        break; -    } -  } - -  public void validateInput(JobConf job) throws IOException { -    // expecting exactly one path -    String tableName = getTableName(job); -     -    if (tableName == null) { -      throw new IOException("expecting one table name"); -    } -    LOG.debug(String.format("Found Table name [%s]", tableName)); -     - -    // connected to table? -    if (getHTable() == null) { -      throw new IOException("could not connect to table '" + -        tableName + "'"); -    } -    LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - -    // expecting at least one column -    String colArg = job.get(COLUMN_LIST); -    if (colArg == null || colArg.length() == 0) { -      throw new IOException("expecting at least one column"); -    } -    LOG.debug(String.format("Found Columns [%s]", colArg)); - -    LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey)); -     -    if( sourceMode == SourceMode.EMPTY ) { -      throw new IOException("SourceMode should not be EMPTY"); -    } -     -    if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) { -      throw new IOException( "Source mode is GET_LIST bu key list is empty"); -    } -  } -   -   -  /* Getters & Setters */ -  private HTable getHTable() { return this.table; } -  private void setHTable(HTable ht) { this.table = ht; } -  private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; } - -   -  private void setJobProp( JobConf job, String key, String value) { -    if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); -    job.set(key,  value); -  } -   -  private String getJobProp( JobConf job, String key ) { return job.get(key); } -   -  public static void setTableName(JobConf job, String tableName) { -    // Make sure that table has not been set before -    String oldTableName = getTableName(job); -    if(oldTableName != null) { -      throw new RuntimeException("table name already set to: '" -        + oldTableName + "'"); -    } -     -    job.set(INPUT_TABLE, tableName); -  } -   -  public static String getTableName(JobConf job) { -    return job.get(INPUT_TABLE); -  } - -  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { -    return true; -  } +public class HBaseInputFormat implements +		InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + +	private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); + +	private final String id = UUID.randomUUID().toString(); + +	private byte[][] inputColumns; +	private HTable table; +	// private HBaseRecordReader tableRecordReader; +	private Filter rowFilter; +	// private String tableName = ""; + +	private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); + +	private String nameServer = null; + +	// private Scan scan = null; + +	private HBaseMultiInputSplit[] convertToMultiSplitArray( +			List<HBaseTableSplit> splits) throws IOException { + +		if (splits == null) +			throw new IOException("The list of splits is null => " + splits); + +		HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>(); + +		for (HBaseTableSplit hbt : splits) { +			HBaseMultiInputSplit mis = null; +			if (regionSplits.containsKey(hbt.getRegionLocation())) { +				mis = regionSplits.get(hbt.getRegionLocation()); +			} else { +				regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit( +						hbt.getRegionLocation())); +				mis = regionSplits.get(hbt.getRegionLocation()); +			} + +			mis.addSplit(hbt); +			regionSplits.put(hbt.getRegionLocation(), mis); +		} + +		Collection<HBaseMultiInputSplit> outVals = regionSplits.values(); + +		LOG.debug("".format("Returning array of splits : %s", outVals)); + +		if (outVals == null) +			throw new IOException("The list of multi input splits were null"); + +		return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]); +	} + +	@SuppressWarnings("deprecation") +	@Override +	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { +		if (this.table == null) { +			throw new IOException("No table was provided"); +		} + +		if (this.inputColumns == null || this.inputColumns.length == 0) { +			throw new IOException("Expecting at least one column"); +		} + +		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + +		if (keys == null || keys.getFirst() == null +				|| keys.getFirst().length == 0) { +			HRegionLocation regLoc = table.getRegionLocation( +					HConstants.EMPTY_BYTE_ARRAY, false); + +			if (null == regLoc) { +				throw new IOException("Expecting at least one region."); +			} + +			final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +			HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +					HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc +							.getHostnamePort().split( +									Addressing.HOSTNAME_PORT_SEPARATOR)[0], +					SourceMode.EMPTY, false); + +			splits.add(split); + +			// TODO: Change to HBaseMultiSplit +			return convertToMultiSplitArray(splits); +		} + +		if (keys.getSecond() == null || keys.getSecond().length == 0) { +			throw new IOException("Expecting at least one region."); +		} + +		if (keys.getFirst().length != keys.getSecond().length) { +			throw new IOException("Regions for start and end key do not match"); +		} + +		byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; +		byte[] maxKey = keys.getSecond()[0]; + +		LOG.debug(String.format("SETTING min key (%s) and max key (%s)", +				Bytes.toString(minKey), Bytes.toString(maxKey))); + +		byte[][] regStartKeys = keys.getFirst(); +		byte[][] regStopKeys = keys.getSecond(); +		String[] regions = new String[regStartKeys.length]; + +		for (int i = 0; i < regStartKeys.length; i++) { +			minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) +					&& (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] +					: minKey; +			maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) +					&& (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] +					: maxKey; + +			HServerAddress regionServerAddress = table.getRegionLocation( +					keys.getFirst()[i]).getServerAddress(); +			InetAddress regionAddress = regionServerAddress.getInetSocketAddress() +					.getAddress(); +			String regionLocation; +			try { +				regionLocation = reverseDNS(regionAddress); +			} catch (NamingException e) { +				LOG.error("Cannot resolve the host name for " + regionAddress +						+ " because of " + e); +				regionLocation = regionServerAddress.getHostname(); +			} + +			// HServerAddress regionServerAddress = +			// table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +			// InetAddress regionAddress = +			// regionServerAddress.getInetSocketAddress().getAddress(); +			// +			// String regionLocation; +			// +			// try { +			// regionLocation = reverseDNS(regionAddress); +			// } catch (NamingException e) { +			// LOG.error("Cannot resolve the host name for " + regionAddress + +			// " because of " + e); +			// regionLocation = regionServerAddress.getHostname(); +			// } + +			// String regionLocation = +			// table.getRegionLocation(keys.getFirst()[i]).getHostname(); + +			LOG.debug("***** " + regionLocation); + +			if (regionLocation == null || regionLocation.length() == 0) +				throw new IOException("The region info for regiosn " + i +						+ " is null or empty"); + +			regions[i] = regionLocation; + +			LOG.debug(String.format( +					"Region (%s) has start key (%s) and stop key (%s)", regions[i], +					Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); +		} + +		byte[] startRow = HConstants.EMPTY_START_ROW; +		byte[] stopRow = HConstants.EMPTY_END_ROW; + +		LOG.debug(String.format("Found min key (%s) and max key (%s)", +				Bytes.toString(minKey), Bytes.toString(maxKey))); + +		LOG.debug("SOURCE MODE is : " + sourceMode); + +		switch (sourceMode) { +			case SCAN_ALL: +				startRow = HConstants.EMPTY_START_ROW; +				stopRow = HConstants.EMPTY_END_ROW; + +				LOG.info(String.format( +						"SCAN ALL: Found start key (%s) and stop key (%s)", +						Bytes.toString(startRow), Bytes.toString(stopRow))); +			break; + +			case SCAN_RANGE: +				startRow = (startKey != null && startKey.length() != 0) ? Bytes +						.toBytes(startKey) : HConstants.EMPTY_START_ROW; +				stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes +						.toBytes(stopKey) : HConstants.EMPTY_END_ROW; + +				LOG.info(String.format( +						"SCAN RANGE: Found start key (%s) and stop key (%s)", +						Bytes.toString(startRow), Bytes.toString(stopRow))); +			break; +		} + +		switch (sourceMode) { +			case EMPTY: +			case SCAN_ALL: +			case SCAN_RANGE: { +				// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : +				// startRow; +				// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : +				// stopRow; + +				final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + +				if (!useSalt) { + +					List<HRegionLocation> validRegions = table.getRegionsInRange( +							startRow, stopRow); + +					int maxRegions = validRegions.size(); +					int currentRegion = 1; + +					for (HRegionLocation cRegion : validRegions) { +						byte[] rStart = cRegion.getRegionInfo().getStartKey(); +						byte[] rStop = cRegion.getRegionInfo().getEndKey(); + +						HServerAddress regionServerAddress = cRegion +								.getServerAddress(); +						InetAddress regionAddress = regionServerAddress +								.getInetSocketAddress().getAddress(); +						String regionLocation; +						try { +							regionLocation = reverseDNS(regionAddress); +						} catch (NamingException e) { +							LOG.error("Cannot resolve the host name for " +									+ regionAddress + " because of " + e); +							regionLocation = regionServerAddress.getHostname(); +						} + +						byte[] sStart = (startRow == HConstants.EMPTY_START_ROW +								|| (Bytes.compareTo(startRow, rStart) <= 0) ? rStart +								: startRow); +						byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW +								|| (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop +								: stopRow); + +						LOG.debug(String.format( +								"BOOL start (%s) stop (%s) length (%d)", +								(startRow == HConstants.EMPTY_START_ROW || (Bytes +										.compareTo(startRow, rStart) <= 0)), +								(stopRow == HConstants.EMPTY_END_ROW || (Bytes +										.compareTo(stopRow, rStop) >= 0)), rStop.length)); + +						HBaseTableSplit split = new HBaseTableSplit( +								table.getTableName(), sStart, sStop, regionLocation, +								SourceMode.SCAN_RANGE, useSalt); + +						split.setEndRowInclusive(currentRegion == maxRegions); + +						currentRegion++; + +						LOG.debug(String +								.format( +										"START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", +										Bytes.toString(startRow), +										Bytes.toString(stopRow), Bytes.toString(rStart), +										Bytes.toString(rStop), Bytes.toString(sStart), +										Bytes.toString(sStop), cRegion.getHostnamePort(), +										split)); + +						splits.add(split); +					} +				} else { +					LOG.debug("Using SALT : " + useSalt); + +					// Will return the start and the stop key with all possible +					// prefixes. +					for (int i = 0; i < regions.length; i++) { +						Pair<byte[], byte[]>[] intervals = HBaseSalter +								.getDistributedIntervals(startRow, stopRow, +										regStartKeys[i], regStopKeys[i], prefixList); + +						for (Pair<byte[], byte[]> pair : intervals) { +							LOG.debug("".format( +									"Using SALT, Region (%s) Start (%s) Stop (%s)", +									regions[i], Bytes.toString(pair.getFirst()), +									Bytes.toString(pair.getSecond()))); + +							HBaseTableSplit split = new HBaseTableSplit( +									table.getTableName(), pair.getFirst(), +									pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, +									useSalt); + +							split.setEndRowInclusive(true); +							splits.add(split); +						} +					} +				} + +				LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); + +				// TODO: Change to HBaseMultiSplit +				return convertToMultiSplitArray(splits); +			} + +			case GET_LIST: { +				// if( keyList == null || keyList.size() == 0 ) { +				if (keyList == null) { +					throw new IOException( +							"Source Mode is GET_LIST but key list is EMPTY"); +				} + +				if (useSalt) { +					TreeSet<String> tempKeyList = new TreeSet<String>(); + +					for (String key : keyList) { +						tempKeyList.add(HBaseSalter.addSaltPrefix(key)); +					} + +					keyList = tempKeyList; +				} + +				LOG.info("".format("Splitting Key List (%s)", keyList)); + +				final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + +				for (int i = 0; i < keys.getFirst().length; i++) { + +					if (!includeRegionInSplit(keys.getFirst()[i], +							keys.getSecond()[i])) { +						continue; +					} + +					LOG.debug(String.format( +							"Getting region (%s) subset (%s) to (%s)", regions[i], +							Bytes.toString(regStartKeys[i]), +							Bytes.toString(regStopKeys[i]))); + +					Set<String> regionsSubSet = null; + +					if ((regStartKeys[i] == null || regStartKeys[i].length == 0) +							&& (regStopKeys[i] == null || regStopKeys[i].length == 0)) { +						LOG.debug("REGION start is empty"); +						LOG.debug("REGION stop is empty"); +						regionsSubSet = keyList; +					} else if (regStartKeys[i] == null +							|| regStartKeys[i].length == 0) { +						LOG.debug("REGION start is empty"); +						regionsSubSet = keyList.headSet( +								Bytes.toString(regStopKeys[i]), true); +					} else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { +						LOG.debug("REGION stop is empty"); +						regionsSubSet = keyList.tailSet( +								Bytes.toString(regStartKeys[i]), true); +					} else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { +						regionsSubSet = keyList.subSet( +								Bytes.toString(regStartKeys[i]), true, +								Bytes.toString(regStopKeys[i]), true); +					} else { +						throw new IOException(String.format( +								"For REGION (%s) Start Key (%s) > Stop Key(%s)", +								regions[i], Bytes.toString(regStartKeys[i]), +								Bytes.toString(regStopKeys[i]))); +					} + +					if (regionsSubSet == null || regionsSubSet.size() == 0) { +						LOG.debug("EMPTY: Key is for region " + regions[i] +								+ " is null"); + +						continue; +					} + +					TreeSet<String> regionKeyList = new TreeSet<String>( +							regionsSubSet); + +					LOG.debug(String.format("Regions [%s] has key list <%s>", +							regions[i], regionKeyList)); + +					HBaseTableSplit split = new HBaseTableSplit( +							table.getTableName(), regionKeyList, versions, regions[i], +							SourceMode.GET_LIST, useSalt); +					splits.add(split); +				} + +				// if (splits.isEmpty()) { +				// LOG.info("GOT EMPTY SPLITS"); + +				// throw new IOException( +				// "".format("Key List NOT found in any region")); + +				// HRegionLocation regLoc = table.getRegionLocation( +				// HConstants.EMPTY_BYTE_ARRAY, false); +				// +				// if (null == regLoc) { +				// throw new IOException("Expecting at least one region."); +				// } +				// +				// HBaseTableSplit split = new HBaseTableSplit( +				// table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, +				// HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort() +				// .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], +				// SourceMode.EMPTY, false); +				// +				// splits.add(split); +				// } + +				LOG.info("RETURNED SPLITS: split -> " + splits); + +				// TODO: Change to HBaseMultiSplit +				return convertToMultiSplitArray(splits); +			} + +			default: +				throw new IOException("Unknown source Mode : " + sourceMode); +		} +	} + +	private String reverseDNS(InetAddress ipAddress) throws NamingException { +		String hostName = this.reverseDNSCacheMap.get(ipAddress); +		if (hostName == null) { +			hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( +					ipAddress, this.nameServer)); +			this.reverseDNSCacheMap.put(ipAddress, hostName); +		} +		return hostName; +	} + +	@Override +	public RecordReader<ImmutableBytesWritable, Result> getRecordReader( +			InputSplit split, JobConf job, Reporter reporter) throws IOException { + +		if (!(split instanceof HBaseMultiInputSplit)) +			throw new IOException("Table Split is not type HBaseMultiInputSplit"); + +		HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split; + +		HBaseRecordReader trr = new HBaseRecordReader(tSplit); + +		trr.setHTable(this.table); +		trr.setInputColumns(this.inputColumns); +		trr.setRowFilter(this.rowFilter); +		trr.setUseSalt(useSalt); + +		trr.setNextSplit(); + +		return trr; +	} + +	/* Configuration Section */ + +	/** +	 * space delimited list of columns +	 */ +	public static final String COLUMN_LIST = "hbase.tablecolumns"; + +	/** +	 * Use this jobconf param to specify the input table +	 */ +	private static final String INPUT_TABLE = "hbase.inputtable"; + +	private String startKey = null; +	private String stopKey = null; + +	private SourceMode sourceMode = SourceMode.EMPTY; +	private TreeSet<String> keyList = null; +	private int versions = 1; +	private boolean useSalt = false; +	private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + +	public void configure(JobConf job) { +		String tableName = getTableName(job); +		String colArg = job.get(COLUMN_LIST); +		String[] colNames = colArg.split(" "); +		byte[][] m_cols = new byte[colNames.length][]; +		for (int i = 0; i < m_cols.length; i++) { +			m_cols[i] = Bytes.toBytes(colNames[i]); +		} +		setInputColumns(m_cols); + +		try { +			setHTable(new HTable(HBaseConfiguration.create(job), tableName)); +		} catch (Exception e) { +			LOG.error("************* Table could not be created"); +			LOG.error(StringUtils.stringifyException(e)); +		} + +		LOG.debug("Entered : " + this.getClass() + " : configure()"); + +		useSalt = job.getBoolean( +				String.format(HBaseConstants.USE_SALT, getTableName(job)), false); +		prefixList = job.get( +				String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), +				HBaseSalter.DEFAULT_PREFIX_LIST); + +		sourceMode = SourceMode.valueOf(job.get(String.format( +				HBaseConstants.SOURCE_MODE, getTableName(job)))); + +		LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String +				.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job +				.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), +				sourceMode)); + +		switch (sourceMode) { +			case SCAN_RANGE: +				LOG.info("HIT SCAN_RANGE"); + +				startKey = getJobProp(job, +						String.format(HBaseConstants.START_KEY, getTableName(job))); +				stopKey = getJobProp(job, +						String.format(HBaseConstants.STOP_KEY, getTableName(job))); + +				LOG.info(String.format("Setting start key (%s) and stop key (%s)", +						startKey, stopKey)); +			break; + +			case GET_LIST: +				LOG.info("HIT GET_LIST"); + +				Collection<String> keys = job.getStringCollection(String.format( +						HBaseConstants.KEY_LIST, getTableName(job))); +				keyList = new TreeSet<String>(keys); + +				versions = job.getInt( +						String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + +				LOG.debug("GOT KEY LIST : " + keys); +				LOG.debug(String.format("SETTING key list (%s)", keyList)); + +			break; + +			case EMPTY: +				LOG.info("HIT EMPTY"); + +				sourceMode = SourceMode.SCAN_ALL; +			break; + +			default: +				LOG.info("HIT DEFAULT"); + +			break; +		} +	} + +	public void validateInput(JobConf job) throws IOException { +		// expecting exactly one path +		String tableName = getTableName(job); + +		if (tableName == null) { +			throw new IOException("expecting one table name"); +		} +		LOG.debug(String.format("Found Table name [%s]", tableName)); + +		// connected to table? +		if (getHTable() == null) { +			throw new IOException("could not connect to table '" + tableName + "'"); +		} +		LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); + +		// expecting at least one column +		String colArg = job.get(COLUMN_LIST); +		if (colArg == null || colArg.length() == 0) { +			throw new IOException("expecting at least one column"); +		} +		LOG.debug(String.format("Found Columns [%s]", colArg)); + +		LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, +				stopKey)); + +		if (sourceMode == SourceMode.EMPTY) { +			throw new IOException("SourceMode should not be EMPTY"); +		} + +		if (sourceMode == SourceMode.GET_LIST +				&& (keyList == null || keyList.size() == 0)) { +			throw new IOException("Source mode is GET_LIST bu key list is empty"); +		} +	} + +	/* Getters & Setters */ +	private HTable getHTable() { +		return this.table; +	} + +	private void setHTable(HTable ht) { +		this.table = ht; +	} + +	private void setInputColumns(byte[][] ic) { +		this.inputColumns = ic; +	} + +	private void setJobProp(JobConf job, String key, String value) { +		if (job.get(key) != null) +			throw new RuntimeException(String.format( +					"Job Conf already has key [%s] with value [%s]", key, +					job.get(key))); +		job.set(key, value); +	} + +	private String getJobProp(JobConf job, String key) { +		return job.get(key); +	} + +	public static void setTableName(JobConf job, String tableName) { +		// Make sure that table has not been set before +		String oldTableName = getTableName(job); +		if (oldTableName != null) { +			throw new RuntimeException("table name already set to: '" +					+ oldTableName + "'"); +		} + +		job.set(INPUT_TABLE, tableName); +	} + +	public static String getTableName(JobConf job) { +		return job.get(INPUT_TABLE); +	} + +	protected boolean includeRegionInSplit(final byte[] startKey, +			final byte[] endKey) { +		return true; +	}  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java index d22ed71..5d7dbdd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.client.ScannerCallable;  import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes;  +import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.hbase.util.Writables;  import org.apache.hadoop.mapred.RecordReader;  import org.apache.hadoop.util.StringUtils; @@ -31,475 +31,579 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  public class HBaseRecordReader implements -    RecordReader<ImmutableBytesWritable, Result> { - -  static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - -  private byte[] startRow; -  private byte[] endRow; -  private byte[] lastSuccessfulRow; -  private TreeSet<String> keyList; -  private SourceMode sourceMode; -  private Filter trrRowFilter; -  private ResultScanner scanner; -  private HTable htable; -  private byte[][] trrInputColumns; -  private long timestamp; -  private int rowcount; -  private boolean logScannerActivity = false; -  private int logPerRowCount = 100; -  private boolean endRowInclusive = true; -  private int versions = 1; -  private boolean useSalt = false; - -  /** -   * Restart from survivable exceptions by creating a new scanner. -   *  -   * @param firstRow -   * @throws IOException -   */ -  public void restartRangeScan(byte[] firstRow) throws IOException { -    Scan currentScan; -    if ((endRow != null) && (endRow.length > 0)) { -      if (trrRowFilter != null) { -        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, -            new byte[] { 0 }) : endRow)); - -        TableInputFormat.addColumns(scan, trrInputColumns); -        scan.setFilter(trrRowFilter); -        scan.setCacheBlocks(false); -        this.scanner = this.htable.getScanner(scan); -        currentScan = scan; -      } else { -        LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) -            + ", endRow: " + Bytes.toString(endRow)); -        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, -            new byte[] { 0 }) : endRow)); -        TableInputFormat.addColumns(scan, trrInputColumns); -        this.scanner = this.htable.getScanner(scan); -        currentScan = scan; -      } -    } else { -      LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) -          + ", no endRow"); - -      Scan scan = new Scan(firstRow); -      TableInputFormat.addColumns(scan, trrInputColumns); -      scan.setFilter(trrRowFilter); -      this.scanner = this.htable.getScanner(scan); -      currentScan = scan; -    } -    if (logScannerActivity) { -      LOG.debug("Current scan=" + currentScan.toString()); -      timestamp = System.currentTimeMillis(); -      rowcount = 0; -    } -  } - -  public TreeSet<String> getKeyList() { -    return keyList; -  } - -  public void setKeyList(TreeSet<String> keyList) { -    this.keyList = keyList; -  } - -  public void setVersions(int versions) { -    this.versions = versions; -  } -   -  public void setUseSalt(boolean useSalt) { -    this.useSalt = useSalt; -  } - -  public SourceMode getSourceMode() { -    return sourceMode; -  } - -  public void setSourceMode(SourceMode sourceMode) { -    this.sourceMode = sourceMode; -  } - -  public byte[] getEndRow() { -    return endRow; -  } - -  public void setEndRowInclusive(boolean isInclusive) { -    endRowInclusive = isInclusive; -  } - -  public boolean getEndRowInclusive() { -    return endRowInclusive; -  } - -  private byte[] nextKey = null; -  private Vector<List<KeyValue>> resultVector = null; -  Map<Long, List<KeyValue>> keyValueMap = null; - -  /** -   * Build the scanner. Not done in constructor to allow for extension. -   *  -   * @throws IOException -   */ -  public void init() throws IOException { -    switch (sourceMode) { -    case SCAN_ALL: -    case SCAN_RANGE: -      restartRangeScan(startRow); -      break; - -    case GET_LIST: -      nextKey = Bytes.toBytes(keyList.pollFirst()); -      break; - -    default: -      throw new IOException(" Unknown source mode : " + sourceMode); -    } -  } - -  byte[] getStartRow() { -    return this.startRow; -  } - -  /** -   * @param htable -   *          the {@link HTable} to scan. -   */ -  public void setHTable(HTable htable) { -    Configuration conf = htable.getConfiguration(); -    logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, -        false); -    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); -    this.htable = htable; -  } - -  /** -   * @param inputColumns -   *          the columns to be placed in {@link Result}. -   */ -  public void setInputColumns(final byte[][] inputColumns) { -    this.trrInputColumns = inputColumns; -  } - -  /** -   * @param startRow -   *          the first row in the split -   */ -  public void setStartRow(final byte[] startRow) { -    this.startRow = startRow; -  } - -  /** -   *  -   * @param endRow -   *          the last row in the split -   */ -  public void setEndRow(final byte[] endRow) { -    this.endRow = endRow; -  } - -  /** -   * @param rowFilter -   *          the {@link Filter} to be used. -   */ -  public void setRowFilter(Filter rowFilter) { -    this.trrRowFilter = rowFilter; -  } - -  @Override -  public void close() { -    if (this.scanner != null) -      this.scanner.close(); -  } - -  /** -   * @return ImmutableBytesWritable -   *  -   * @see org.apache.hadoop.mapred.RecordReader#createKey() -   */ -  @Override -  public ImmutableBytesWritable createKey() { -    return new ImmutableBytesWritable(); -  } - -  /** -   * @return RowResult -   *  -   * @see org.apache.hadoop.mapred.RecordReader#createValue() -   */ -  @Override -  public Result createValue() { -    return new Result(); -  } - -  @Override -  public long getPos() { -    // This should be the ordinal tuple in the range; -    // not clear how to calculate... -    return 0; -  } - -  @Override -  public float getProgress() { -    // Depends on the total number of tuples and getPos -    return 0; -  } - -  /** -   * @param key -   *          HStoreKey as input key. -   * @param value -   *          MapWritable as input value -   * @return true if there was more data -   * @throws IOException -   */ -  @Override -  public boolean next(ImmutableBytesWritable key, Result value) -      throws IOException { - -    switch (sourceMode) { -    case SCAN_ALL: -    case SCAN_RANGE: { - -      Result result; -      try { -        try { -          result = this.scanner.next(); -          if (logScannerActivity) { -            rowcount++; -            if (rowcount >= logPerRowCount) { -              long now = System.currentTimeMillis(); -              LOG.debug("Mapper took " + (now - timestamp) + "ms to process " -                  + rowcount + " rows"); -              timestamp = now; -              rowcount = 0; -            } -          } -        } catch (IOException e) { -          // try to handle all IOExceptions by restarting -          // the scanner, if the second call fails, it will be rethrown -          LOG.debug("recovered from " + StringUtils.stringifyException(e)); -          if (lastSuccessfulRow == null) { -            LOG.warn("We are restarting the first next() invocation," -                + " if your mapper has restarted a few other times like this" -                + " then you should consider killing this job and investigate" -                + " why it's taking so long."); -          } -          if (lastSuccessfulRow == null) { -            restartRangeScan(startRow); -          } else { -            restartRangeScan(lastSuccessfulRow); -            this.scanner.next(); // skip presumed already mapped row -          } -          result = this.scanner.next(); -        } - -        if (result != null && result.size() > 0) { -          if( useSalt) { -            key.set( HBaseSalter.delSaltPrefix(result.getRow())); -          } else { -            key.set(result.getRow()); -          } -           -          lastSuccessfulRow = key.get(); -          Writables.copyWritable(result, value); -          return true; -        } -        return false; -      } catch (IOException ioe) { -        if (logScannerActivity) { -          long now = System.currentTimeMillis(); -          LOG.debug("Mapper took " + (now - timestamp) + "ms to process " -              + rowcount + " rows"); -          LOG.debug(ioe); -          String lastRow = lastSuccessfulRow == null ? "null" : Bytes -              .toStringBinary(lastSuccessfulRow); -          LOG.debug("lastSuccessfulRow=" + lastRow); -        } -        throw ioe; -      } -    } - -    case GET_LIST: { -      LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); -       -      if (versions == 1) { -        if (nextKey != null) { -          LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); -           -          Get theGet = new Get(nextKey); -          theGet.setMaxVersions(versions); - -          Result result = this.htable.get(theGet); - -          if (result != null && (! result.isEmpty()) ) { -            LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); - -            if (keyList != null || !keyList.isEmpty()) { -              String newKey = keyList.pollFirst(); -              LOG.debug("New Key => " + newKey); -              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes -                  .toBytes(newKey); -            } else { -              nextKey = null; -            } -             -            LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); -             -            // Write the result -            if( useSalt) { -              key.set( HBaseSalter.delSaltPrefix(result.getRow())); -            } else { -              key.set(result.getRow()); -            } -            lastSuccessfulRow = key.get(); -            Writables.copyWritable(result, value); -             -            return true; -          } else { -            LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 -             -            String newKey; -            while((newKey = keyList.pollFirst()) != null) { -              LOG.debug("WHILE NEXT Key => " + newKey); - -              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes -                  .toBytes(newKey); -               -              if( nextKey == null ) { -                LOG.error("BOMB! BOMB! BOMB!"); -                continue;  -              } -               -              if( ! this.htable.exists( new Get(nextKey) ) ) { -                LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); -                continue;  -              } else { break; } -            } -             -            nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes -                .toBytes(newKey); -             -            LOG.debug("Final New Key => " + Bytes.toString(nextKey)); - -            return next(key, value); -          } -        } else { -          // Nothig left. return false -          return false; -        } -      } else { -        if (resultVector != null && resultVector.size() != 0) {  -          LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); - -          List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); -          Result result = new Result(resultKeyValue); - -          LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); - -          if( useSalt) { -            key.set( HBaseSalter.delSaltPrefix(result.getRow())); -          } else { -            key.set(result.getRow()); -          } -          lastSuccessfulRow = key.get(); -          Writables.copyWritable(result, value); - -          return true; -        } else { -          if (nextKey != null) { -            LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); -             -            Get theGet = new Get(nextKey); -            theGet.setMaxVersions(versions); - -            Result resultAll = this.htable.get(theGet); -             -            if( resultAll != null && (! resultAll.isEmpty())) { -              List<KeyValue> keyValeList = resultAll.list(); - -              keyValueMap = new HashMap<Long, List<KeyValue>>(); -               -              LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); - -              for (KeyValue keyValue : keyValeList) { -                long version = keyValue.getTimestamp(); - -                if (keyValueMap.containsKey(new Long(version))) { -                  List<KeyValue> keyValueTempList = keyValueMap.get(new Long( -                      version)); -                  if (keyValueTempList == null) { -                    keyValueTempList = new ArrayList<KeyValue>(); -                  } -                  keyValueTempList.add(keyValue); -                } else { -                  List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); -                  keyValueMap.put(new Long(version), keyValueTempList); -                  keyValueTempList.add(keyValue); -                } -              } - -              resultVector = new Vector<List<KeyValue>>(); -              resultVector.addAll(keyValueMap.values()); - -              List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); - -              Result result = new Result(resultKeyValue); - -              LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); - -              String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - -              System.out.println("+ New Key => " + newKey); -              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes -                  .toBytes(newKey); - -              if( useSalt) { -                key.set( HBaseSalter.delSaltPrefix(result.getRow())); -              } else { -                key.set(result.getRow()); -              } -              lastSuccessfulRow = key.get(); -              Writables.copyWritable(result, value); -              return true; -            } else { -              LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 -               -              String newKey; -               -              while( (newKey = keyList.pollFirst()) != null ) { -                LOG.debug("+ WHILE NEXT Key => " + newKey); - -                nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes -                    .toBytes(newKey); -                 -                if( nextKey == null ) { -                  LOG.error("+ BOMB! BOMB! BOMB!"); -                  continue;  -                } -                 -                if( ! this.htable.exists( new Get(nextKey) ) ) { -                  LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); -                  continue;  -                } else { break; } -              } -               -              nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes -                  .toBytes(newKey); -               -              LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); - -              return next(key, value); -            } -             -          } else { -            return false; -          } -        } -      } -    } -    default: -      throw new IOException("Unknown source mode : " + sourceMode); -    } -  } +		RecordReader<ImmutableBytesWritable, Result> { + +	static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); + +	private byte[] startRow; +	private byte[] endRow; +	private byte[] lastSuccessfulRow; +	private TreeSet<String> keyList; +	private SourceMode sourceMode; +	private Filter trrRowFilter; +	private ResultScanner scanner; +	private HTable htable; +	private byte[][] trrInputColumns; +	private long timestamp; +	private int rowcount; +	private boolean logScannerActivity = false; +	private int logPerRowCount = 100; +	private boolean endRowInclusive = true; +	private int versions = 1; +	private boolean useSalt = false; + +	private HBaseMultiInputSplit multiSplit = null; +	private List<HBaseTableSplit> allSplits = null; + +	private HBaseRecordReader() { +	} + +	public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException { +		multiSplit = mSplit; + +		LOG.info("Creatin Multi Split for region location : " +				+ multiSplit.getRegionLocation()); + +		allSplits = multiSplit.getSplits(); +	} + +	public boolean setNextSplit() throws IOException { +		if (allSplits.size() > 0) { +			setSplitValue(allSplits.remove(0)); +			return true; +		} else { +			return false; +		} +	} + +	private void setSplitValue(HBaseTableSplit tSplit) throws IOException { +		switch (tSplit.getSourceMode()) { +			case SCAN_ALL: +			case SCAN_RANGE: { +				LOG.debug(String.format( +						"For split [%s] we have start key (%s) and stop key (%s)", +						tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + +				setStartRow(tSplit.getStartRow()); +				setEndRow(tSplit.getEndRow()); +				setEndRowInclusive(tSplit.getEndRowInclusive()); +			} + +			break; + +			case GET_LIST: { +				LOG.debug(String.format("For split [%s] we have key list (%s)", +						tSplit, tSplit.getKeyList())); + +				setKeyList(tSplit.getKeyList()); +				setVersions(tSplit.getVersions()); +			} + +			break; + +			case EMPTY: +				LOG.info("EMPTY split. Doing nothing."); +			break; + +			default: +				throw new IOException("Unknown source mode : " +						+ tSplit.getSourceMode()); +		} + +		setSourceMode(tSplit.getSourceMode()); + +		init(); +	} + +	/** +	 * Restart from survivable exceptions by creating a new scanner. +	 *  +	 * @param firstRow +	 * @throws IOException +	 */ +	private void restartRangeScan(byte[] firstRow) throws IOException { +		Scan currentScan; +		if ((endRow != null) && (endRow.length > 0)) { +			if (trrRowFilter != null) { +				Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +						new byte[] { 0 }) : endRow)); + +				TableInputFormat.addColumns(scan, trrInputColumns); +				scan.setFilter(trrRowFilter); +				scan.setCacheBlocks(false); +				this.scanner = this.htable.getScanner(scan); +				currentScan = scan; +			} else { +				LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) +						+ ", endRow: " + Bytes.toString(endRow)); +				Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, +						new byte[] { 0 }) : endRow)); +				TableInputFormat.addColumns(scan, trrInputColumns); +				this.scanner = this.htable.getScanner(scan); +				currentScan = scan; +			} +		} else { +			LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) +					+ ", no endRow"); + +			Scan scan = new Scan(firstRow); +			TableInputFormat.addColumns(scan, trrInputColumns); +			scan.setFilter(trrRowFilter); +			this.scanner = this.htable.getScanner(scan); +			currentScan = scan; +		} +		if (logScannerActivity) { +			LOG.debug("Current scan=" + currentScan.toString()); +			timestamp = System.currentTimeMillis(); +			rowcount = 0; +		} +	} + +	public TreeSet<String> getKeyList() { +		return keyList; +	} + +	private void setKeyList(TreeSet<String> keyList) { +		this.keyList = keyList; +	} + +	private void setVersions(int versions) { +		this.versions = versions; +	} + +	public void setUseSalt(boolean useSalt) { +		this.useSalt = useSalt; +	} + +	public SourceMode getSourceMode() { +		return sourceMode; +	} + +	private void setSourceMode(SourceMode sourceMode) { +		this.sourceMode = sourceMode; +	} + +	public byte[] getEndRow() { +		return endRow; +	} + +	private void setEndRowInclusive(boolean isInclusive) { +		endRowInclusive = isInclusive; +	} + +	public boolean getEndRowInclusive() { +		return endRowInclusive; +	} + +	private byte[] nextKey = null; +	private Vector<List<KeyValue>> resultVector = null; +	Map<Long, List<KeyValue>> keyValueMap = null; + +	/** +	 * Build the scanner. Not done in constructor to allow for extension. +	 *  +	 * @throws IOException +	 */ +	private void init() throws IOException { +		switch (sourceMode) { +			case SCAN_ALL: +			case SCAN_RANGE: +				restartRangeScan(startRow); +			break; + +			case GET_LIST: +				nextKey = Bytes.toBytes(keyList.pollFirst()); +			break; + +			case EMPTY: +				LOG.info("EMPTY mode. Do nothing"); +			break; + +			default: +				throw new IOException(" Unknown source mode : " + sourceMode); +		} +	} + +	byte[] getStartRow() { +		return this.startRow; +	} + +	/** +	 * @param htable +	 *           the {@link HTable} to scan. +	 */ +	public void setHTable(HTable htable) { +		Configuration conf = htable.getConfiguration(); +		logScannerActivity = conf.getBoolean( +				ScannerCallable.LOG_SCANNER_ACTIVITY, false); +		logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); +		this.htable = htable; +	} + +	/** +	 * @param inputColumns +	 *           the columns to be placed in {@link Result}. +	 */ +	public void setInputColumns(final byte[][] inputColumns) { +		this.trrInputColumns = inputColumns; +	} + +	/** +	 * @param startRow +	 *           the first row in the split +	 */ +	private void setStartRow(final byte[] startRow) { +		this.startRow = startRow; +	} + +	/** +	 *  +	 * @param endRow +	 *           the last row in the split +	 */ +	private void setEndRow(final byte[] endRow) { +		this.endRow = endRow; +	} + +	/** +	 * @param rowFilter +	 *           the {@link Filter} to be used. +	 */ +	public void setRowFilter(Filter rowFilter) { +		this.trrRowFilter = rowFilter; +	} + +	@Override +	public void close() { +		if (this.scanner != null) +			this.scanner.close(); +	} + +	/** +	 * @return ImmutableBytesWritable +	 *  +	 * @see org.apache.hadoop.mapred.RecordReader#createKey() +	 */ +	@Override +	public ImmutableBytesWritable createKey() { +		return new ImmutableBytesWritable(); +	} + +	/** +	 * @return RowResult +	 *  +	 * @see org.apache.hadoop.mapred.RecordReader#createValue() +	 */ +	@Override +	public Result createValue() { +		return new Result(); +	} + +	@Override +	public long getPos() { +		// This should be the ordinal tuple in the range; +		// not clear how to calculate... +		return 0; +	} + +	@Override +	public float getProgress() { +		// Depends on the total number of tuples and getPos +		return 0; +	} + +	/** +	 * @param key +	 *           HStoreKey as input key. +	 * @param value +	 *           MapWritable as input value +	 * @return true if there was more data +	 * @throws IOException +	 */ +	@Override +	public boolean next(ImmutableBytesWritable key, Result value) +			throws IOException { + +		switch (sourceMode) { +			case SCAN_ALL: +			case SCAN_RANGE: { + +				Result result; +				try { +					try { +						result = this.scanner.next(); +						if (logScannerActivity) { +							rowcount++; +							if (rowcount >= logPerRowCount) { +								long now = System.currentTimeMillis(); +								LOG.debug("Mapper took " + (now - timestamp) +										+ "ms to process " + rowcount + " rows"); +								timestamp = now; +								rowcount = 0; +							} +						} +					} catch (IOException e) { +						// try to handle all IOExceptions by restarting +						// the scanner, if the second call fails, it will be rethrown +						LOG.debug("recovered from " +								+ StringUtils.stringifyException(e)); +						if (lastSuccessfulRow == null) { +							LOG.warn("We are restarting the first next() invocation," +									+ " if your mapper has restarted a few other times like this" +									+ " then you should consider killing this job and investigate" +									+ " why it's taking so long."); +						} +						if (lastSuccessfulRow == null) { +							restartRangeScan(startRow); +						} else { +							restartRangeScan(lastSuccessfulRow); +							this.scanner.next(); // skip presumed already mapped row +						} +						result = this.scanner.next(); +					} + +					if (result != null && result.size() > 0) { +						if (useSalt) { +							key.set(HBaseSalter.delSaltPrefix(result.getRow())); +						} else { +							key.set(result.getRow()); +						} + +						lastSuccessfulRow = key.get(); +						Writables.copyWritable(result, value); +						return true; +					} +					return setNextSplit(); +				} catch (IOException ioe) { +					if (logScannerActivity) { +						long now = System.currentTimeMillis(); +						LOG.debug("Mapper took " + (now - timestamp) +								+ "ms to process " + rowcount + " rows"); +						LOG.debug(ioe); +						String lastRow = lastSuccessfulRow == null ? "null" : Bytes +								.toStringBinary(lastSuccessfulRow); +						LOG.debug("lastSuccessfulRow=" + lastRow); +					} +					throw ioe; +				} +			} + +			case GET_LIST: { +				LOG.debug(String.format("INTO next with GET LIST and Key (%s)", +						Bytes.toString(nextKey))); + +				if (versions == 1) { +					if (nextKey != null) { +						LOG.debug(String.format("Processing Key (%s)", +								Bytes.toString(nextKey))); + +						Get theGet = new Get(nextKey); +						theGet.setMaxVersions(versions); + +						Result result = this.htable.get(theGet); + +						if (result != null && (!result.isEmpty())) { +							LOG.debug(String.format( +									"Key (%s), Version (%s), Got Result (%s)", +									Bytes.toString(nextKey), versions, result)); + +							if (keyList != null || !keyList.isEmpty()) { +								String newKey = keyList.pollFirst(); +								LOG.debug("New Key => " + newKey); +								nextKey = (newKey == null || newKey.length() == 0) ? null +										: Bytes.toBytes(newKey); +							} else { +								nextKey = null; +							} + +							LOG.debug(String.format("=> Picked a new Key (%s)", +									Bytes.toString(nextKey))); + +							// Write the result +							if (useSalt) { +								key.set(HBaseSalter.delSaltPrefix(result.getRow())); +							} else { +								key.set(result.getRow()); +							} +							lastSuccessfulRow = key.get(); +							Writables.copyWritable(result, value); + +							return true; +						} else { +							LOG.debug(" Key (" + Bytes.toString(nextKey) +									+ ") return an EMPTY result. Get (" + theGet.getId() +									+ ")"); // alg0 + +							String newKey; +							while ((newKey = keyList.pollFirst()) != null) { +								LOG.debug("WHILE NEXT Key => " + newKey); + +								nextKey = (newKey == null || newKey.length() == 0) ? null +										: Bytes.toBytes(newKey); + +								if (nextKey == null) { +									LOG.error("BOMB! BOMB! BOMB!"); +									continue; +								} + +								if (!this.htable.exists(new Get(nextKey))) { +									LOG.debug(String.format( +											"Key (%s) Does not exist in Table (%s)", +											Bytes.toString(nextKey), +											Bytes.toString(this.htable.getTableName()))); +									continue; +								} else { +									break; +								} +							} + +							nextKey = (newKey == null || newKey.length() == 0) ? null +									: Bytes.toBytes(newKey); + +							LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + +							return next(key, value); +						} +					} else { +						// Nothig left. return false +						return setNextSplit(); +					} +				} else { +					if (resultVector != null && resultVector.size() != 0) { +						LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", +								versions, resultVector)); + +						List<KeyValue> resultKeyValue = resultVector +								.remove(resultVector.size() - 1); +						Result result = new Result(resultKeyValue); + +						LOG.debug(String.format("+ Version (%s), Got Result <%s>", +								versions, result)); + +						if (useSalt) { +							key.set(HBaseSalter.delSaltPrefix(result.getRow())); +						} else { +							key.set(result.getRow()); +						} +						lastSuccessfulRow = key.get(); +						Writables.copyWritable(result, value); + +						return true; +					} else { +						if (nextKey != null) { +							LOG.debug(String.format("+ Processing Key (%s)", +									Bytes.toString(nextKey))); + +							Get theGet = new Get(nextKey); +							theGet.setMaxVersions(versions); + +							Result resultAll = this.htable.get(theGet); + +							if (resultAll != null && (!resultAll.isEmpty())) { +								List<KeyValue> keyValeList = resultAll.list(); + +								keyValueMap = new HashMap<Long, List<KeyValue>>(); + +								LOG.debug(String.format( +										"+ Key (%s) Versions (%s) Val;ute map <%s>", +										Bytes.toString(nextKey), versions, keyValueMap)); + +								for (KeyValue keyValue : keyValeList) { +									long version = keyValue.getTimestamp(); + +									if (keyValueMap.containsKey(new Long(version))) { +										List<KeyValue> keyValueTempList = keyValueMap +												.get(new Long(version)); +										if (keyValueTempList == null) { +											keyValueTempList = new ArrayList<KeyValue>(); +										} +										keyValueTempList.add(keyValue); +									} else { +										List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); +										keyValueMap.put(new Long(version), +												keyValueTempList); +										keyValueTempList.add(keyValue); +									} +								} + +								resultVector = new Vector<List<KeyValue>>(); +								resultVector.addAll(keyValueMap.values()); + +								List<KeyValue> resultKeyValue = resultVector +										.remove(resultVector.size() - 1); + +								Result result = new Result(resultKeyValue); + +								LOG.debug(String.format( +										"+ Version (%s), Got Result (%s)", versions, +										result)); + +								String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + +								System.out.println("+ New Key => " + newKey); +								nextKey = (newKey == null || newKey.length() == 0) ? null +										: Bytes.toBytes(newKey); + +								if (useSalt) { +									key.set(HBaseSalter.delSaltPrefix(result.getRow())); +								} else { +									key.set(result.getRow()); +								} +								lastSuccessfulRow = key.get(); +								Writables.copyWritable(result, value); +								return true; +							} else { +								LOG.debug(String.format( +										"+ Key (%s) return an EMPTY result. Get (%s)", +										Bytes.toString(nextKey), theGet.getId())); // alg0 + +								String newKey; + +								while ((newKey = keyList.pollFirst()) != null) { +									LOG.debug("+ WHILE NEXT Key => " + newKey); + +									nextKey = (newKey == null || newKey.length() == 0) ? null +											: Bytes.toBytes(newKey); + +									if (nextKey == null) { +										LOG.error("+ BOMB! BOMB! BOMB!"); +										continue; +									} + +									if (!this.htable.exists(new Get(nextKey))) { +										LOG.debug(String.format( +												"+ Key (%s) Does not exist in Table (%s)", +												Bytes.toString(nextKey), +												Bytes.toString(this.htable.getTableName()))); +										continue; +									} else { +										break; +									} +								} + +								nextKey = (newKey == null || newKey.length() == 0) ? null +										: Bytes.toBytes(newKey); + +								LOG.debug("+ Final New Key => " +										+ Bytes.toString(nextKey)); + +								return next(key, value); +							} + +						} else { +							return setNextSplit(); +						} +					} +				} +			} + +			case EMPTY: { +				LOG.info("GOT an empty Split"); +				return setNextSplit(); +			} + +			default: +				throw new IOException("Unknown source mode : " + sourceMode); +		} +	}  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java index a5c3bdd..87b8f58 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -12,196 +12,208 @@ import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.InputSplit; -import com.sun.tools.javac.resources.version; -  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { - -  private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); -   -  private byte [] m_tableName = null; -  private byte [] m_startRow = null; -  private byte [] m_endRow = null; -  private String m_regionLocation = null; -  private TreeSet<String> m_keyList = null; -  private SourceMode m_sourceMode = SourceMode.EMPTY; -  private boolean m_endRowInclusive = true; -  private int m_versions = 1; -  private boolean m_useSalt = false; - -  /** default constructor */ -  public HBaseTableSplit() { -    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, -      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); -  } - -  /** -   * Constructor -   * @param tableName -   * @param startRow -   * @param endRow -   * @param location -   */ -  public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, -      final String location, final SourceMode sourceMode, final boolean useSalt) { -    this.m_tableName = tableName; -    this.m_startRow = startRow; -    this.m_endRow = endRow; -    this.m_regionLocation = location; -    this.m_sourceMode = sourceMode; -    this.m_useSalt = useSalt; -  } -   -  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) { -    this.m_tableName = tableName; -    this.m_keyList = keyList; -    this.m_versions = versions; -    this.m_sourceMode = sourceMode; -    this.m_regionLocation = location; -    this.m_useSalt = useSalt; -  } - -  /** @return table name */ -  public byte [] getTableName() { -    return this.m_tableName; -  } - -  /** @return starting row key */ -  public byte [] getStartRow() { -    return this.m_startRow; -  } - -  /** @return end row key */ -  public byte [] getEndRow() { -    return this.m_endRow; -  } -   -  public boolean getEndRowInclusive() { -    return m_endRowInclusive; -  } -   -  public void setEndRowInclusive(boolean isInclusive) { -    m_endRowInclusive = isInclusive; -  } -   -  /** @return list of keys to get */ -  public TreeSet<String> getKeyList() { -    return m_keyList; -  } -   -  public int getVersions() { -	  return m_versions; -  } - -  /** @return get the source mode */ -  public SourceMode getSourceMode() { -    return m_sourceMode; -  } -   -  public boolean getUseSalt() { -    return m_useSalt; -  } - -  /** @return the region's hostname */ -  public String getRegionLocation() { -    LOG.debug("REGION GETTER : " + m_regionLocation); -     -    return this.m_regionLocation; -  } - -  public String[] getLocations() { -    LOG.debug("REGION ARRAY : " + m_regionLocation); - -    return new String[] {this.m_regionLocation}; -  } - -  @Override -  public long getLength() { -    // Not clear how to obtain this... seems to be used only for sorting splits -    return 0; -  } - -  @Override -  public void readFields(DataInput in) throws IOException { -    LOG.debug("READ ME : " + in.toString()); - -    this.m_tableName = Bytes.readByteArray(in); -    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); -    this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); -    this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); -     -    switch(this.m_sourceMode) { -      case SCAN_RANGE: -        this.m_startRow = Bytes.readByteArray(in); -        this.m_endRow = Bytes.readByteArray(in); -        this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); -        break; -         -      case GET_LIST: -    	this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); -        this.m_keyList = new TreeSet<String>(); -         -        int m = Bytes.toInt(Bytes.readByteArray(in)); -         -        for( int i = 0; i < m; i++) { -          this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); -        } -        break; -    } -     -    LOG.debug("READ and CREATED : " + this); -  } - -  @Override -  public void write(DataOutput out) throws IOException { -    LOG.debug("WRITE : " + this); - -    Bytes.writeByteArray(out, this.m_tableName); -    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); -    Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); -    Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); -     -    switch( this.m_sourceMode ) { -      case SCAN_RANGE: -        Bytes.writeByteArray(out, this.m_startRow); -        Bytes.writeByteArray(out, this.m_endRow); -        Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); -        break; -         -      case GET_LIST: -    	Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); -        Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); -         -        for( String k: this.m_keyList ) { -          Bytes.writeByteArray(out, Bytes.toBytes(k)); -        } -        break; -    } - -    LOG.debug("WROTE : " + out.toString()); -  } - -  @Override -  public String toString() { -    return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",  -        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),  -        (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt); -  } - -  @Override -  public int compareTo(HBaseTableSplit o) { -    switch(m_sourceMode) { -      case SCAN_ALL: -      case SCAN_RANGE: -        return Bytes.compareTo(getStartRow(), o.getStartRow()); -         -      case GET_LIST: -        return m_keyList.equals( o.getKeyList() ) ? 0 : -1;  -         -      default: -        return -1; -    } -     -  } +public class HBaseTableSplit implements InputSplit, +		Comparable<HBaseTableSplit>, Serializable { + +	private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); + +	private byte[] m_tableName = null; +	private byte[] m_startRow = null; +	private byte[] m_endRow = null; +	private String m_regionLocation = null; +	private TreeSet<String> m_keyList = null; +	private SourceMode m_sourceMode = SourceMode.EMPTY; +	private boolean m_endRowInclusive = true; +	private int m_versions = 1; +	private boolean m_useSalt = false; + +	/** default constructor */ +	public HBaseTableSplit() { +		this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, +				HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); +	} + +	/** +	 * Constructor +	 *  +	 * @param tableName +	 * @param startRow +	 * @param endRow +	 * @param location +	 */ +	public HBaseTableSplit(final byte[] tableName, final byte[] startRow, +			final byte[] endRow, final String location, +			final SourceMode sourceMode, final boolean useSalt) { +		this.m_tableName = tableName; +		this.m_startRow = startRow; +		this.m_endRow = endRow; +		this.m_regionLocation = location; +		this.m_sourceMode = sourceMode; +		this.m_useSalt = useSalt; +	} + +	public HBaseTableSplit(final byte[] tableName, +			final TreeSet<String> keyList, int versions, final String location, +			final SourceMode sourceMode, final boolean useSalt) { +		this.m_tableName = tableName; +		this.m_keyList = keyList; +		this.m_versions = versions; +		this.m_sourceMode = sourceMode; +		this.m_regionLocation = location; +		this.m_useSalt = useSalt; +	} + +	/** @return table name */ +	public byte[] getTableName() { +		return this.m_tableName; +	} + +	/** @return starting row key */ +	public byte[] getStartRow() { +		return this.m_startRow; +	} + +	/** @return end row key */ +	public byte[] getEndRow() { +		return this.m_endRow; +	} + +	public boolean getEndRowInclusive() { +		return m_endRowInclusive; +	} + +	public void setEndRowInclusive(boolean isInclusive) { +		m_endRowInclusive = isInclusive; +	} + +	/** @return list of keys to get */ +	public TreeSet<String> getKeyList() { +		return m_keyList; +	} + +	public int getVersions() { +		return m_versions; +	} + +	/** @return get the source mode */ +	public SourceMode getSourceMode() { +		return m_sourceMode; +	} + +	public boolean getUseSalt() { +		return m_useSalt; +	} + +	/** @return the region's hostname */ +	public String getRegionLocation() { +		LOG.debug("REGION GETTER : " + m_regionLocation); + +		return this.m_regionLocation; +	} + +	public String[] getLocations() { +		LOG.debug("REGION ARRAY : " + m_regionLocation); + +		return new String[] { this.m_regionLocation }; +	} + +	@Override +	public long getLength() { +		// Not clear how to obtain this... seems to be used only for sorting +		// splits +		return 0; +	} + +	@Override +	public void readFields(DataInput in) throws IOException { +		LOG.debug("READ ME : " + in.toString()); + +		this.m_tableName = Bytes.readByteArray(in); +		this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); +		this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes +				.readByteArray(in))); +		this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); + +		switch (this.m_sourceMode) { +			case SCAN_RANGE: +				this.m_startRow = Bytes.readByteArray(in); +				this.m_endRow = Bytes.readByteArray(in); +				this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); +			break; + +			case GET_LIST: +				this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); +				this.m_keyList = new TreeSet<String>(); + +				int m = Bytes.toInt(Bytes.readByteArray(in)); + +				for (int i = 0; i < m; i++) { +					this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); +				} +			break; +		} + +		LOG.debug("READ and CREATED : " + this); +	} + +	@Override +	public void write(DataOutput out) throws IOException { +		LOG.debug("WRITE : " + this); + +		Bytes.writeByteArray(out, this.m_tableName); +		Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); +		Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +		Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); + +		switch (this.m_sourceMode) { +			case SCAN_RANGE: +				Bytes.writeByteArray(out, this.m_startRow); +				Bytes.writeByteArray(out, this.m_endRow); +				Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); +			break; + +			case GET_LIST: +				Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); +				Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + +				for (String k : this.m_keyList) { +					Bytes.writeByteArray(out, Bytes.toBytes(k)); +				} +			break; +		} + +		LOG.debug("WROTE : " + out.toString()); +	} + +	@Override +	public String toString() { +		return String +				.format( +						"Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", +						Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, +						Bytes.toString(m_startRow), Bytes.toString(m_endRow), +						(m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, +						m_useSalt); +	} + +	@Override +	public int compareTo(HBaseTableSplit o) { +		switch (m_sourceMode) { +			case SCAN_ALL: +			case SCAN_RANGE: +				return Bytes.compareTo(getStartRow(), o.getStartRow()); + +			case GET_LIST: +				return m_keyList.equals(o.getKeyList()) ? 0 : -1; + +			case EMPTY: +				return 0; + +			default: +				return -1; +		} + +	}  }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index eccd653..2aa5342 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -7,93 +7,101 @@ import org.apache.hadoop.hbase.client.HTable  import org.apache.hadoop.hbase.util.Bytes  import org.apache.log4j.Level  import org.apache.log4j.Logger -  import com.twitter.scalding._  import com.twitter.scalding.Args -  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseSource  import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import org.apache.hadoop.hbase.client.Put +import parallelai.spyglass.hbase.HBaseSalter  class HBaseExample(args: Args) extends JobBase(args) { -  val isDebug: Boolean = args("debug").toBoolean +   val isDebug: Boolean = args("debug").toBoolean -  if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) +   if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) -  val output = args("output") +   val output = args("output") -  println(output) +   val jobConf = getJobConf() -  val jobConf = getJobConf() +   val quorumNames = args("quorum") -  val quorumNames = args("quorum") +   println("Output : " + output) +   println("Quorum : " + quorumNames) -  case class HBaseTableStore( +   case class HBaseTableStore(        conf: Configuration,        quorum: String,        tableName: String) { -    val tableBytes = Bytes.toBytes(tableName) -    val connection = HConnectionManager.getConnection(conf) -    val maxThreads = conf.getInt("hbase.htable.threads.max", 1) - -    conf.set("hbase.zookeeper.quorum", quorumNames) - -    val htable = new HTable(HBaseConfiguration.create(conf), tableName) - -  } - -  val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet") - -  val hbs2 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) -    .read -    .write(Tsv(output.format("get_list"))) - -  val hbs3 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") -    .read -    .write(Tsv(output.format("scan_all"))) - -  val hbs4 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") -    .read -    .write(Tsv(output.format("scan_range_to_end"))) - -  val hbs5 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") -    .read -    .write(Tsv(output.format("scan_range_from_start"))) - -  val hbs6 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") -    .read -    .write(Tsv(output.format("scan_range_between"))) - -} 
\ No newline at end of file +      val tableBytes = Bytes.toBytes(tableName) +      val connection = HConnectionManager.getConnection(conf) +      val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + +      conf.set("hbase.zookeeper.quorum", quorumNames) + +      val htable = new HTable(HBaseConfiguration.create(conf), tableName) + +      def makeN(n: Int) { +         (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => { +            val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x))) +            put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x)) +         }) +      } + +   } + +   HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000) + +   val hbs2 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.GET_LIST, keyList = List("13914", "10687", "14897").map(x => "%015d".format(x.toLong)), useSalt = true) +      .read +      .write(Tsv(output.format("get_list"))) + +   val hbs3 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") +      .read +      .write(Tsv(output.format("scan_all"))) + +   val hbs4 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_RANGE, stopKey = "%015d".format("13914".toLong), useSalt = true) +      .read +      .write(Tsv(output.format("scan_range_to_end"))) + +   val hbs5 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), useSalt = true) +      .read +      .write(Tsv(output.format("scan_range_from_start"))) + +   val hbs6 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), stopKey = "%015d".format("16897".toLong), useSalt = true) +      .read +      .write(Tsv(output.format("scan_range_between"))) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala index 890d2be..920f17d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -3,36 +3,39 @@ package parallelai.spyglass.hbase.example  import com.twitter.scalding.Tool  import org.joda.time.format.DateTimeFormat  import java.util.Formatter.DateTime +import parallelai.spyglass.base.JobRunner  object HBaseExampleRunner extends App { -  val appPath = System.getenv("BIGDATA_APPCONF_PATH")  -  assert  (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) -  println( "Application Path is [%s]".format(appPath) ) -   -  val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { -    case "hdfs" => "--hdfs" -    case _ => "--local" -  }} -   -  println(modeString) -   -  val jobLibPath = modeString match { -    case "--hdfs" => { -      val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")  -      assert  (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) -      println( "Job Library Path Path is [%s]".format(jobLibPath) ) -      jobLibPath -    } -    case _ => "" -  } -   -  val quorum = System.getenv("BIGDATA_QUORUM_NAMES") -  assert  (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) -  println( "Quorum is [%s]".format(quorum) ) - -  val output = "HBaseTest.%s.tsv" - -  Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, -    "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum )) -  +   val appPath = System.getenv("BIGDATA_APPCONF_PATH") +   assert(appPath != null, { "Environment Variable BIGDATA_APPCONF_PATH is undefined or Null" }) +   println("Application Path is [%s]".format(appPath)) + +   val modeString = if (args.length == 0) { "--hdfs" } else { +      args(0) match { +         case "hdfs" => "--hdfs" +         case _ => "--hdfs" +      } +   } + +   println(modeString) + +   val jobLibPath = modeString match { +      case "--hdfs" => { +         val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") +         assert(jobLibPath != null, { "Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null" }) +         println("Job Library Path Path is [%s]".format(jobLibPath)) +         jobLibPath +      } +      case _ => "" +   } + +   val quorum = System.getenv("BIGDATA_QUORUM_NAMES") +   assert(quorum != null, { "Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null" }) +   println("Quorum is [%s]".format(quorum)) + +   val output = "HBaseTest.%s" + +   JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath, +      "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum)) +  }
\ No newline at end of file | 
