diff options
Diffstat (limited to 'src/main')
21 files changed, 1332 insertions, 1982 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java new file mode 100644 index 0000000..598a988 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -0,0 +1,53 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 17:25 + * To change this template use File | Settings | File Templates. + */ +public class HBaseConfigUtils { +    static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); + +    public static void setRecordReaderParms(HBaseRecordReaderBase trr, HBaseTableSplitBase tSplit) throws IOException { +        switch (tSplit.getSourceMode()) { +            case SCAN_ALL: +            case SCAN_RANGE: { +                LOG.debug(String.format( +                        "For split [%s] we have start key (%s) and stop key (%s)", +                        tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + +                trr.setStartRow(tSplit.getStartRow()); +                trr.setEndRow(tSplit.getEndRow()); +                trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +                trr.setUseSalt(tSplit.getUseSalt()); +            } + +            break; + +            case GET_LIST: { +                LOG.debug(String.format("For split [%s] we have key list (%s)", +                        tSplit, tSplit.getKeyList())); + +                trr.setKeyList(tSplit.getKeyList()); +                trr.setVersions(tSplit.getVersions()); +                trr.setUseSalt(tSplit.getUseSalt()); +            } + +            break; + +            default: +                throw new IOException("Unknown source mode : " +                        + tSplit.getSourceMode()); +        } + +        trr.setSourceMode(tSplit.getSourceMode()); +    } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 25b89cb..5b5e9c3 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -3,21 +3,26 @@ package parallelai.spyglass.hbase;  import org.apache.hadoop.conf.Configuration;  public class HBaseConstants { -   -  public enum SourceMode { -    EMPTY, -    SCAN_ALL, -    SCAN_RANGE, -    GET_LIST; -  } -  public static final String START_KEY = "hbase.%s.startkey"; -  public static final String STOP_KEY = "hbase.%s.stopkey";  -  public static final String SOURCE_MODE = "hbase.%s.source.mode"; -  public static final String KEY_LIST = "hbase.%s.key.list"; -  public static final String VERSIONS = "hbase.%s.versions"; -  public static final String USE_SALT = "hbase.%s.use.salt"; -  public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; +    public enum SourceMode { +        EMPTY, +        SCAN_ALL, +        SCAN_RANGE, +        GET_LIST; +    } -  public static final String SINK_MODE = "hbase.%s.sink.mode"; -} +    public enum SplitType { +        GRANULAR, +        REGIONAL; +    } + +    public static final String START_KEY = "hbase.%s.startkey"; +    public static final String STOP_KEY = "hbase.%s.stopkey"; +    public static final String SOURCE_MODE = "hbase.%s.source.mode"; +    public static final String KEY_LIST = "hbase.%s.key.list"; +    public static final String VERSIONS = "hbase.%s.versions"; +    public static final String USE_SALT = "hbase.%s.use.salt"; +    public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + +    public static final String SINK_MODE = "hbase.%s.sink.mode"; +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java deleted file mode 100644 index 5a1184f..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ /dev/null @@ -1,645 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import javax.naming.NamingException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseInputFormat implements -		InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { - -	private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); - -	private final String id = UUID.randomUUID().toString(); - -	private byte[][] inputColumns; -	private HTable table; -	// private HBaseRecordReader tableRecordReader; -	private Filter rowFilter; -	// private String tableName = ""; - -	private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); - -	private String nameServer = null; - -	// private Scan scan = null; - -	private HBaseMultiInputSplit[] convertToMultiSplitArray( -			List<HBaseTableSplit> splits) throws IOException { - -		if (splits == null) -			throw new IOException("The list of splits is null => " + splits); - -		HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>(); - -		for (HBaseTableSplit hbt : splits) { -			HBaseMultiInputSplit mis = null; -			if (regionSplits.containsKey(hbt.getRegionLocation())) { -				mis = regionSplits.get(hbt.getRegionLocation()); -			} else { -				regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit( -						hbt.getRegionLocation())); -				mis = regionSplits.get(hbt.getRegionLocation()); -			} - -			mis.addSplit(hbt); -			regionSplits.put(hbt.getRegionLocation(), mis); -		} - -		Collection<HBaseMultiInputSplit> outVals = regionSplits.values(); - -		LOG.debug("".format("Returning array of splits : %s", outVals)); - -		if (outVals == null) -			throw new IOException("The list of multi input splits were null"); - -		return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]); -	} - -	@SuppressWarnings("deprecation") -	@Override -	public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException { -		if (this.table == null) { -			throw new IOException("No table was provided"); -		} - -		if (this.inputColumns == null || this.inputColumns.length == 0) { -			throw new IOException("Expecting at least one column"); -		} - -		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); - -		if (keys == null || keys.getFirst() == null -				|| keys.getFirst().length == 0) { -			HRegionLocation regLoc = table.getRegionLocation( -					HConstants.EMPTY_BYTE_ARRAY, false); - -			if (null == regLoc) { -				throw new IOException("Expecting at least one region."); -			} - -			final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); -			HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), -					HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc -							.getHostnamePort().split( -									Addressing.HOSTNAME_PORT_SEPARATOR)[0], -					SourceMode.EMPTY, false); - -			splits.add(split); - -			// TODO: Change to HBaseMultiSplit -			return convertToMultiSplitArray(splits); -		} - -		if (keys.getSecond() == null || keys.getSecond().length == 0) { -			throw new IOException("Expecting at least one region."); -		} - -		if (keys.getFirst().length != keys.getSecond().length) { -			throw new IOException("Regions for start and end key do not match"); -		} - -		byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; -		byte[] maxKey = keys.getSecond()[0]; - -		LOG.debug(String.format("SETTING min key (%s) and max key (%s)", -				Bytes.toString(minKey), Bytes.toString(maxKey))); - -		byte[][] regStartKeys = keys.getFirst(); -		byte[][] regStopKeys = keys.getSecond(); -		String[] regions = new String[regStartKeys.length]; - -		for (int i = 0; i < regStartKeys.length; i++) { -			minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) -					&& (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] -					: minKey; -			maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) -					&& (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] -					: maxKey; - -			HServerAddress regionServerAddress = table.getRegionLocation( -					keys.getFirst()[i]).getServerAddress(); -			InetAddress regionAddress = regionServerAddress.getInetSocketAddress() -					.getAddress(); -			String regionLocation; -			try { -				regionLocation = reverseDNS(regionAddress); -			} catch (NamingException e) { -				LOG.error("Cannot resolve the host name for " + regionAddress -						+ " because of " + e); -				regionLocation = regionServerAddress.getHostname(); -			} - -			// HServerAddress regionServerAddress = -			// table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); -			// InetAddress regionAddress = -			// regionServerAddress.getInetSocketAddress().getAddress(); -			// -			// String regionLocation; -			// -			// try { -			// regionLocation = reverseDNS(regionAddress); -			// } catch (NamingException e) { -			// LOG.error("Cannot resolve the host name for " + regionAddress + -			// " because of " + e); -			// regionLocation = regionServerAddress.getHostname(); -			// } - -			// String regionLocation = -			// table.getRegionLocation(keys.getFirst()[i]).getHostname(); - -			LOG.debug("***** " + regionLocation); - -			if (regionLocation == null || regionLocation.length() == 0) -				throw new IOException("The region info for regiosn " + i -						+ " is null or empty"); - -			regions[i] = regionLocation; - -			LOG.debug(String.format( -					"Region (%s) has start key (%s) and stop key (%s)", regions[i], -					Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); -		} - -		byte[] startRow = HConstants.EMPTY_START_ROW; -		byte[] stopRow = HConstants.EMPTY_END_ROW; - -		LOG.debug(String.format("Found min key (%s) and max key (%s)", -				Bytes.toString(minKey), Bytes.toString(maxKey))); - -		LOG.debug("SOURCE MODE is : " + sourceMode); - -		switch (sourceMode) { -			case SCAN_ALL: -				startRow = HConstants.EMPTY_START_ROW; -				stopRow = HConstants.EMPTY_END_ROW; - -				LOG.info(String.format( -						"SCAN ALL: Found start key (%s) and stop key (%s)", -						Bytes.toString(startRow), Bytes.toString(stopRow))); -			break; - -			case SCAN_RANGE: -				startRow = (startKey != null && startKey.length() != 0) ? Bytes -						.toBytes(startKey) : HConstants.EMPTY_START_ROW; -				stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes -						.toBytes(stopKey) : HConstants.EMPTY_END_ROW; - -				LOG.info(String.format( -						"SCAN RANGE: Found start key (%s) and stop key (%s)", -						Bytes.toString(startRow), Bytes.toString(stopRow))); -			break; -		} - -		switch (sourceMode) { -			case EMPTY: -			case SCAN_ALL: -			case SCAN_RANGE: { -				// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : -				// startRow; -				// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : -				// stopRow; - -				final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - -				if (!useSalt) { - -					List<HRegionLocation> validRegions = table.getRegionsInRange( -							startRow, stopRow); - -					int maxRegions = validRegions.size(); -					int currentRegion = 1; - -					for (HRegionLocation cRegion : validRegions) { -						byte[] rStart = cRegion.getRegionInfo().getStartKey(); -						byte[] rStop = cRegion.getRegionInfo().getEndKey(); - -						HServerAddress regionServerAddress = cRegion -								.getServerAddress(); -						InetAddress regionAddress = regionServerAddress -								.getInetSocketAddress().getAddress(); -						String regionLocation; -						try { -							regionLocation = reverseDNS(regionAddress); -						} catch (NamingException e) { -							LOG.error("Cannot resolve the host name for " -									+ regionAddress + " because of " + e); -							regionLocation = regionServerAddress.getHostname(); -						} - -						byte[] sStart = (startRow == HConstants.EMPTY_START_ROW -								|| (Bytes.compareTo(startRow, rStart) <= 0) ? rStart -								: startRow); -						byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW -								|| (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop -								: stopRow); - -						LOG.debug(String.format( -								"BOOL start (%s) stop (%s) length (%d)", -								(startRow == HConstants.EMPTY_START_ROW || (Bytes -										.compareTo(startRow, rStart) <= 0)), -								(stopRow == HConstants.EMPTY_END_ROW || (Bytes -										.compareTo(stopRow, rStop) >= 0)), rStop.length)); - -						HBaseTableSplit split = new HBaseTableSplit( -								table.getTableName(), sStart, sStop, regionLocation, -								SourceMode.SCAN_RANGE, useSalt); - -						split.setEndRowInclusive(currentRegion == maxRegions); - -						currentRegion++; - -						LOG.debug(String -								.format( -										"START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", -										Bytes.toString(startRow), -										Bytes.toString(stopRow), Bytes.toString(rStart), -										Bytes.toString(rStop), Bytes.toString(sStart), -										Bytes.toString(sStop), cRegion.getHostnamePort(), -										split)); - -						splits.add(split); -					} -				} else { -					LOG.debug("Using SALT : " + useSalt); - -					// Will return the start and the stop key with all possible -					// prefixes. -					for (int i = 0; i < regions.length; i++) { -						Pair<byte[], byte[]>[] intervals = HBaseSalter -								.getDistributedIntervals(startRow, stopRow, -										regStartKeys[i], regStopKeys[i], prefixList); - -						for (Pair<byte[], byte[]> pair : intervals) { -							LOG.debug("".format( -									"Using SALT, Region (%s) Start (%s) Stop (%s)", -									regions[i], Bytes.toString(pair.getFirst()), -									Bytes.toString(pair.getSecond()))); - -							HBaseTableSplit split = new HBaseTableSplit( -									table.getTableName(), pair.getFirst(), -									pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, -									useSalt); - -							split.setEndRowInclusive(true); -							splits.add(split); -						} -					} -				} - -				LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); - -				// TODO: Change to HBaseMultiSplit -				return convertToMultiSplitArray(splits); -			} - -			case GET_LIST: { -				// if( keyList == null || keyList.size() == 0 ) { -				if (keyList == null) { -					throw new IOException( -							"Source Mode is GET_LIST but key list is EMPTY"); -				} - -				if (useSalt) { -					TreeSet<String> tempKeyList = new TreeSet<String>(); - -					for (String key : keyList) { -						tempKeyList.add(HBaseSalter.addSaltPrefix(key)); -					} - -					keyList = tempKeyList; -				} - -				LOG.info("".format("Splitting Key List (%s)", keyList)); - -				final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - -				for (int i = 0; i < keys.getFirst().length; i++) { - -					if (!includeRegionInSplit(keys.getFirst()[i], -							keys.getSecond()[i])) { -						continue; -					} - -					LOG.debug(String.format( -							"Getting region (%s) subset (%s) to (%s)", regions[i], -							Bytes.toString(regStartKeys[i]), -							Bytes.toString(regStopKeys[i]))); - -					Set<String> regionsSubSet = null; - -					if ((regStartKeys[i] == null || regStartKeys[i].length == 0) -							&& (regStopKeys[i] == null || regStopKeys[i].length == 0)) { -						LOG.debug("REGION start is empty"); -						LOG.debug("REGION stop is empty"); -						regionsSubSet = keyList; -					} else if (regStartKeys[i] == null -							|| regStartKeys[i].length == 0) { -						LOG.debug("REGION start is empty"); -						regionsSubSet = keyList.headSet( -								Bytes.toString(regStopKeys[i]), true); -					} else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { -						LOG.debug("REGION stop is empty"); -						regionsSubSet = keyList.tailSet( -								Bytes.toString(regStartKeys[i]), true); -					} else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { -						regionsSubSet = keyList.subSet( -								Bytes.toString(regStartKeys[i]), true, -								Bytes.toString(regStopKeys[i]), true); -					} else { -						throw new IOException(String.format( -								"For REGION (%s) Start Key (%s) > Stop Key(%s)", -								regions[i], Bytes.toString(regStartKeys[i]), -								Bytes.toString(regStopKeys[i]))); -					} - -					if (regionsSubSet == null || regionsSubSet.size() == 0) { -						LOG.debug("EMPTY: Key is for region " + regions[i] -								+ " is null"); - -						continue; -					} - -					TreeSet<String> regionKeyList = new TreeSet<String>( -							regionsSubSet); - -					LOG.debug(String.format("Regions [%s] has key list <%s>", -							regions[i], regionKeyList)); - -					HBaseTableSplit split = new HBaseTableSplit( -							table.getTableName(), regionKeyList, versions, regions[i], -							SourceMode.GET_LIST, useSalt); -					splits.add(split); -				} - -				// if (splits.isEmpty()) { -				// LOG.info("GOT EMPTY SPLITS"); - -				// throw new IOException( -				// "".format("Key List NOT found in any region")); - -				// HRegionLocation regLoc = table.getRegionLocation( -				// HConstants.EMPTY_BYTE_ARRAY, false); -				// -				// if (null == regLoc) { -				// throw new IOException("Expecting at least one region."); -				// } -				// -				// HBaseTableSplit split = new HBaseTableSplit( -				// table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, -				// HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort() -				// .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], -				// SourceMode.EMPTY, false); -				// -				// splits.add(split); -				// } - -				LOG.info("RETURNED SPLITS: split -> " + splits); - -				// TODO: Change to HBaseMultiSplit -				return convertToMultiSplitArray(splits); -			} - -			default: -				throw new IOException("Unknown source Mode : " + sourceMode); -		} -	} - -	private String reverseDNS(InetAddress ipAddress) throws NamingException { -		String hostName = this.reverseDNSCacheMap.get(ipAddress); -		if (hostName == null) { -			hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( -					ipAddress, this.nameServer)); -			this.reverseDNSCacheMap.put(ipAddress, hostName); -		} -		return hostName; -	} - -	@Override -	public RecordReader<ImmutableBytesWritable, Result> getRecordReader( -			InputSplit split, JobConf job, Reporter reporter) throws IOException { - -		if (!(split instanceof HBaseMultiInputSplit)) -			throw new IOException("Table Split is not type HBaseMultiInputSplit"); - -		HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split; - -		HBaseRecordReader trr = new HBaseRecordReader(tSplit); - -		trr.setHTable(this.table); -		trr.setInputColumns(this.inputColumns); -		trr.setRowFilter(this.rowFilter); -		trr.setUseSalt(useSalt); - -		trr.setNextSplit(); - -		return trr; -	} - -	/* Configuration Section */ - -	/** -	 * space delimited list of columns -	 */ -	public static final String COLUMN_LIST = "hbase.tablecolumns"; - -	/** -	 * Use this jobconf param to specify the input table -	 */ -	private static final String INPUT_TABLE = "hbase.inputtable"; - -	private String startKey = null; -	private String stopKey = null; - -	private SourceMode sourceMode = SourceMode.EMPTY; -	private TreeSet<String> keyList = null; -	private int versions = 1; -	private boolean useSalt = false; -	private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - -	public void configure(JobConf job) { -		String tableName = getTableName(job); -		String colArg = job.get(COLUMN_LIST); -		String[] colNames = colArg.split(" "); -		byte[][] m_cols = new byte[colNames.length][]; -		for (int i = 0; i < m_cols.length; i++) { -			m_cols[i] = Bytes.toBytes(colNames[i]); -		} -		setInputColumns(m_cols); - -		try { -			setHTable(new HTable(HBaseConfiguration.create(job), tableName)); -		} catch (Exception e) { -			LOG.error("************* Table could not be created"); -			LOG.error(StringUtils.stringifyException(e)); -		} - -		LOG.debug("Entered : " + this.getClass() + " : configure()"); - -		useSalt = job.getBoolean( -				String.format(HBaseConstants.USE_SALT, getTableName(job)), false); -		prefixList = job.get( -				String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), -				HBaseSalter.DEFAULT_PREFIX_LIST); - -		sourceMode = SourceMode.valueOf(job.get(String.format( -				HBaseConstants.SOURCE_MODE, getTableName(job)))); - -		LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String -				.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job -				.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), -				sourceMode)); - -		switch (sourceMode) { -			case SCAN_RANGE: -				LOG.info("HIT SCAN_RANGE"); - -				startKey = getJobProp(job, -						String.format(HBaseConstants.START_KEY, getTableName(job))); -				stopKey = getJobProp(job, -						String.format(HBaseConstants.STOP_KEY, getTableName(job))); - -				LOG.info(String.format("Setting start key (%s) and stop key (%s)", -						startKey, stopKey)); -			break; - -			case GET_LIST: -				LOG.info("HIT GET_LIST"); - -				Collection<String> keys = job.getStringCollection(String.format( -						HBaseConstants.KEY_LIST, getTableName(job))); -				keyList = new TreeSet<String>(keys); - -				versions = job.getInt( -						String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - -				LOG.debug("GOT KEY LIST : " + keys); -				LOG.debug(String.format("SETTING key list (%s)", keyList)); - -			break; - -			case EMPTY: -				LOG.info("HIT EMPTY"); - -				sourceMode = SourceMode.SCAN_ALL; -			break; - -			default: -				LOG.info("HIT DEFAULT"); - -			break; -		} -	} - -	public void validateInput(JobConf job) throws IOException { -		// expecting exactly one path -		String tableName = getTableName(job); - -		if (tableName == null) { -			throw new IOException("expecting one table name"); -		} -		LOG.debug(String.format("Found Table name [%s]", tableName)); - -		// connected to table? -		if (getHTable() == null) { -			throw new IOException("could not connect to table '" + tableName + "'"); -		} -		LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - -		// expecting at least one column -		String colArg = job.get(COLUMN_LIST); -		if (colArg == null || colArg.length() == 0) { -			throw new IOException("expecting at least one column"); -		} -		LOG.debug(String.format("Found Columns [%s]", colArg)); - -		LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, -				stopKey)); - -		if (sourceMode == SourceMode.EMPTY) { -			throw new IOException("SourceMode should not be EMPTY"); -		} - -		if (sourceMode == SourceMode.GET_LIST -				&& (keyList == null || keyList.size() == 0)) { -			throw new IOException("Source mode is GET_LIST bu key list is empty"); -		} -	} - -	/* Getters & Setters */ -	private HTable getHTable() { -		return this.table; -	} - -	private void setHTable(HTable ht) { -		this.table = ht; -	} - -	private void setInputColumns(byte[][] ic) { -		this.inputColumns = ic; -	} - -	private void setJobProp(JobConf job, String key, String value) { -		if (job.get(key) != null) -			throw new RuntimeException(String.format( -					"Job Conf already has key [%s] with value [%s]", key, -					job.get(key))); -		job.set(key, value); -	} - -	private String getJobProp(JobConf job, String key) { -		return job.get(key); -	} - -	public static void setTableName(JobConf job, String tableName) { -		// Make sure that table has not been set before -		String oldTableName = getTableName(job); -		if (oldTableName != null) { -			throw new RuntimeException("table name already set to: '" -					+ oldTableName + "'"); -		} - -		job.set(INPUT_TABLE, tableName); -	} - -	public static String getTableName(JobConf job) { -		return job.get(INPUT_TABLE); -	} - -	protected boolean includeRegionInSplit(final byte[] startKey, -			final byte[] endKey) { -		return true; -	} -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java new file mode 100644 index 0000000..2f3047b --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -0,0 +1,172 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.util.StringUtils; + +import java.util.Collection; +import java.util.TreeSet; +import java.util.UUID; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:43 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + +    private final Log LOG = LogFactory.getLog(HBaseInputFormatBase.class); + +    protected final String id = UUID.randomUUID().toString(); +    protected byte[][] inputColumns; +    protected HTable table; +    protected Filter rowFilter; + +    public static final String COLUMN_LIST = "hbase.tablecolumns"; + +    /** +     * Use this jobconf param to specify the input table +     */ +    protected static final String INPUT_TABLE = "hbase.inputtable"; + +    protected String startKey = null; +    protected String stopKey = null; + +    protected HBaseConstants.SourceMode sourceMode = HBaseConstants.SourceMode.EMPTY; +    protected TreeSet<String> keyList = null; +    protected int versions = 1; +    protected boolean useSalt = false; +    protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + + + +    @Override +    public void configure(JobConf job) { +        String tableName = getTableName(job); +        String colArg = job.get(COLUMN_LIST); +        String[] colNames = colArg.split(" "); +        byte[][] m_cols = new byte[colNames.length][]; +        for (int i = 0; i < m_cols.length; i++) { +            m_cols[i] = Bytes.toBytes(colNames[i]); +        } +        setInputColumns(m_cols); + +        try { +            setHTable(new HTable(HBaseConfiguration.create(job), tableName)); +        } catch (Exception e) { +            LOG.error("************* Table could not be created"); +            LOG.error(StringUtils.stringifyException(e)); +        } + +        LOG.debug("Entered : " + this.getClass() + " : configure()"); + +        useSalt = job.getBoolean( +                String.format(HBaseConstants.USE_SALT, getTableName(job)), false); +        prefixList = job.get( +                String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), +                HBaseSalter.DEFAULT_PREFIX_LIST); + +        sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format( +                HBaseConstants.SOURCE_MODE, getTableName(job)))); + +        LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String +                .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job +                .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), +                sourceMode)); + +        switch (sourceMode) { +            case SCAN_RANGE: +                LOG.debug("HIT SCAN_RANGE"); + +                startKey = getJobProp(job, +                        String.format(HBaseConstants.START_KEY, getTableName(job))); +                stopKey = getJobProp(job, +                        String.format(HBaseConstants.STOP_KEY, getTableName(job))); + +                LOG.debug(String.format("Setting start key (%s) and stop key (%s)", +                        startKey, stopKey)); +                break; + +            case GET_LIST: +                LOG.debug("HIT GET_LIST"); + +                Collection<String> keys = job.getStringCollection(String.format( +                        HBaseConstants.KEY_LIST, getTableName(job))); +                keyList = new TreeSet<String>(keys); + +                versions = job.getInt( +                        String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + +                LOG.debug("GOT KEY LIST : " + keys); +                LOG.debug(String.format("SETTING key list (%s)", keyList)); + +                break; + +            case EMPTY: +                LOG.info("HIT EMPTY"); + +                sourceMode = HBaseConstants.SourceMode.SCAN_ALL; +                break; + +            default: +                LOG.info("HIT DEFAULT"); + +                break; +        } +    } + +    /* Getters & Setters */ +    protected HTable getHTable() { +        return this.table; +    } + +    protected void setHTable(HTable ht) { +        this.table = ht; +    } + +    protected void setInputColumns(byte[][] ic) { +        this.inputColumns = ic; +    } + +    protected void setJobProp(JobConf job, String key, String value) { +        if (job.get(key) != null) +            throw new RuntimeException(String.format( +                    "Job Conf already has key [%s] with value [%s]", key, +                    job.get(key))); +        job.set(key, value); +    } + +    protected String getJobProp(JobConf job, String key) { +        return job.get(key); +    } + +    public static void setTableName(JobConf job, String tableName) { +        // Make sure that table has not been set before +        String oldTableName = getTableName(job); +        if (oldTableName != null) { +            throw new RuntimeException("table name already set to: '" +                    + oldTableName + "'"); +        } + +        job.set(INPUT_TABLE, tableName); +    } + +    public static String getTableName(JobConf job) { +        return job.get(INPUT_TABLE); +    } + +    protected void setParms(HBaseRecordReaderBase trr) { + +    } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 96bfea1..929e9d8 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -37,17 +37,10 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseInputFormat_SINGLE implements -		InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { +public class HBaseInputFormatGranular extends HBaseInputFormatBase { -	private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class); +	private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); -	private final String id = UUID.randomUUID().toString(); - -	private byte[][] inputColumns; -	private HTable table; -	// private HBaseRecordReader tableRecordReader; -	private Filter rowFilter;  	// private String tableName = "";  	private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); @@ -58,7 +51,7 @@ public class HBaseInputFormat_SINGLE implements  	@SuppressWarnings("deprecation")  	@Override -	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { +	public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException {  		if (this.table == null) {  			throw new IOException("No table was provided");  		} @@ -78,8 +71,8 @@ public class HBaseInputFormat_SINGLE implements  				throw new IOException("Expecting at least one region.");  			} -			List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); -			HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +			List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>(1); +			HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(),  					HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc  							.getHostnamePort().split(  									Addressing.HOSTNAME_PORT_SEPARATOR)[0], @@ -87,7 +80,7 @@ public class HBaseInputFormat_SINGLE implements  			splits.add(split); -			return splits.toArray(new HBaseTableSplit[splits.size()]); +			return splits.toArray(new HBaseTableSplitGranular[splits.size()]);  		}  		if (keys.getSecond() == null || keys.getSecond().length == 0) { @@ -173,7 +166,7 @@ public class HBaseInputFormat_SINGLE implements  				startRow = HConstants.EMPTY_START_ROW;  				stopRow = HConstants.EMPTY_END_ROW; -				LOG.info(String.format( +				LOG.debug(String.format(  						"SCAN ALL: Found start key (%s) and stop key (%s)",  						Bytes.toString(startRow), Bytes.toString(stopRow)));  			break; @@ -184,7 +177,7 @@ public class HBaseInputFormat_SINGLE implements  				stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes  						.toBytes(stopKey) : HConstants.EMPTY_END_ROW; -				LOG.info(String.format( +				LOG.debug(String.format(  						"SCAN RANGE: Found start key (%s) and stop key (%s)",  						Bytes.toString(startRow), Bytes.toString(stopRow)));  			break; @@ -199,7 +192,7 @@ public class HBaseInputFormat_SINGLE implements  				// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey :  				// stopRow; -				List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +				List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>();  				if (!useSalt) { @@ -240,7 +233,7 @@ public class HBaseInputFormat_SINGLE implements  								(stopRow == HConstants.EMPTY_END_ROW || (Bytes  										.compareTo(stopRow, rStop) >= 0)), rStop.length)); -						HBaseTableSplit split = new HBaseTableSplit( +						HBaseTableSplitGranular split = new HBaseTableSplitGranular(  								table.getTableName(), sStart, sStop, regionLocation,  								SourceMode.SCAN_RANGE, useSalt); @@ -270,12 +263,12 @@ public class HBaseInputFormat_SINGLE implements  										regStartKeys[i], regStopKeys[i], prefixList);  						for (Pair<byte[], byte[]> pair : intervals) { -							LOG.info("".format( +							LOG.debug("".format(  									"Using SALT, Region (%s) Start (%s) Stop (%s)",  									regions[i], Bytes.toString(pair.getFirst()),  									Bytes.toString(pair.getSecond()))); -							HBaseTableSplit split = new HBaseTableSplit( +							HBaseTableSplitGranular split = new HBaseTableSplitGranular(  									table.getTableName(), pair.getFirst(),  									pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,  									useSalt); @@ -286,12 +279,12 @@ public class HBaseInputFormat_SINGLE implements  					}  				} -				LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); -				for (HBaseTableSplit s : splits) { -					LOG.info("RETURNED SPLITS: split -> " + s); +				LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); +				for (HBaseTableSplitGranular s : splits) { +					LOG.debug("RETURNED SPLITS: split -> " + s);  				} -				return splits.toArray(new HBaseTableSplit[splits.size()]); +				return splits.toArray(new HBaseTableSplitGranular[splits.size()]);  			}  			case GET_LIST: { @@ -313,7 +306,7 @@ public class HBaseInputFormat_SINGLE implements  				LOG.debug("".format("Splitting Key List (%s)", keyList)); -				List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +				List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>();  				for (int i = 0; i < keys.getFirst().length; i++) { @@ -325,7 +318,7 @@ public class HBaseInputFormat_SINGLE implements  					LOG.debug(String.format(  							"Getting region (%s) subset (%s) to (%s)", regions[i],  							Bytes.toString(regStartKeys[i]), -							Bytes.toString(regStartKeys[i]))); +							Bytes.toString(regStopKeys[i])));  					Set<String> regionsSubSet = null; @@ -367,7 +360,7 @@ public class HBaseInputFormat_SINGLE implements  					LOG.debug(String.format("Regions [%s] has key list <%s>",  							regions[i], regionKeyList)); -					HBaseTableSplit split = new HBaseTableSplit( +					HBaseTableSplitGranular split = new HBaseTableSplitGranular(  							table.getTableName(), regionKeyList, versions, regions[i],  							SourceMode.GET_LIST, useSalt);  					splits.add(split); @@ -375,7 +368,7 @@ public class HBaseInputFormat_SINGLE implements  				LOG.debug("RETURNED SPLITS: split -> " + splits); -				return splits.toArray(new HBaseTableSplit[splits.size()]); +				return splits.toArray(new HBaseTableSplitGranular[splits.size()]);  			}  			default: @@ -397,45 +390,17 @@ public class HBaseInputFormat_SINGLE implements  	public RecordReader<ImmutableBytesWritable, Result> getRecordReader(  			InputSplit split, JobConf job, Reporter reporter) throws IOException { -		if (!(split instanceof HBaseTableSplit)) -			throw new IOException("Table Split is not type HBaseTableSplit"); - -		HBaseTableSplit tSplit = (HBaseTableSplit) split; - -		HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE(); - -		switch (tSplit.getSourceMode()) { -			case SCAN_ALL: -			case SCAN_RANGE: { -				LOG.debug(String.format( -						"For split [%s] we have start key (%s) and stop key (%s)", -						tSplit, tSplit.getStartRow(), tSplit.getEndRow())); - -				trr.setStartRow(tSplit.getStartRow()); -				trr.setEndRow(tSplit.getEndRow()); -				trr.setEndRowInclusive(tSplit.getEndRowInclusive()); -				trr.setUseSalt(useSalt); -			} +        LOG.info("GRANULAR SPLIT -> " + split); -			break; +        if (!(split instanceof HBaseTableSplitGranular)) +			throw new IOException("Table Split is not type HBaseTableSplitGranular"); -			case GET_LIST: { -				LOG.debug(String.format("For split [%s] we have key list (%s)", -						tSplit, tSplit.getKeyList())); +		HBaseTableSplitGranular tSplit = (HBaseTableSplitGranular) split; -				trr.setKeyList(tSplit.getKeyList()); -				trr.setVersions(tSplit.getVersions()); -				trr.setUseSalt(useSalt); -			} +		HBaseRecordReaderGranular trr = new HBaseRecordReaderGranular(); -			break; - -			default: -				throw new IOException("Unknown source mode : " -						+ tSplit.getSourceMode()); -		} +        HBaseConfigUtils.setRecordReaderParms(trr, tSplit); -		trr.setSourceMode(tSplit.getSourceMode());  		trr.setHTable(this.table);  		trr.setInputColumns(this.inputColumns);  		trr.setRowFilter(this.rowFilter); @@ -450,95 +415,6 @@ public class HBaseInputFormat_SINGLE implements  	/**  	 * space delimited list of columns  	 */ -	public static final String COLUMN_LIST = "hbase.tablecolumns"; - -	/** -	 * Use this jobconf param to specify the input table -	 */ -	private static final String INPUT_TABLE = "hbase.inputtable"; - -	private String startKey = null; -	private String stopKey = null; - -	private SourceMode sourceMode = SourceMode.EMPTY; -	private TreeSet<String> keyList = null; -	private int versions = 1; -	private boolean useSalt = false; -	private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - -	public void configure(JobConf job) { -		String tableName = getTableName(job); -		String colArg = job.get(COLUMN_LIST); -		String[] colNames = colArg.split(" "); -		byte[][] m_cols = new byte[colNames.length][]; -		for (int i = 0; i < m_cols.length; i++) { -			m_cols[i] = Bytes.toBytes(colNames[i]); -		} -		setInputColumns(m_cols); - -		try { -			setHTable(new HTable(HBaseConfiguration.create(job), tableName)); -		} catch (Exception e) { -			LOG.error("************* Table could not be created"); -			LOG.error(StringUtils.stringifyException(e)); -		} - -		LOG.debug("Entered : " + this.getClass() + " : configure()"); - -		useSalt = job.getBoolean( -				String.format(HBaseConstants.USE_SALT, getTableName(job)), false); -		prefixList = job.get( -				String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), -				HBaseSalter.DEFAULT_PREFIX_LIST); - -		sourceMode = SourceMode.valueOf(job.get(String.format( -				HBaseConstants.SOURCE_MODE, getTableName(job)))); - -		LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String -				.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job -				.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), -				sourceMode)); - -		switch (sourceMode) { -			case SCAN_RANGE: -				LOG.info("HIT SCAN_RANGE"); - -				startKey = getJobProp(job, -						String.format(HBaseConstants.START_KEY, getTableName(job))); -				stopKey = getJobProp(job, -						String.format(HBaseConstants.STOP_KEY, getTableName(job))); - -				LOG.info(String.format("Setting start key (%s) and stop key (%s)", -						startKey, stopKey)); -			break; - -			case GET_LIST: -				LOG.info("HIT GET_LIST"); - -				Collection<String> keys = job.getStringCollection(String.format( -						HBaseConstants.KEY_LIST, getTableName(job))); -				keyList = new TreeSet<String>(keys); - -				versions = job.getInt( -						String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - -				LOG.debug("GOT KEY LIST : " + keys); -				LOG.debug(String.format("SETTING key list (%s)", keyList)); - -			break; - -			case EMPTY: -				LOG.info("HIT EMPTY"); - -				sourceMode = SourceMode.SCAN_ALL; -			break; - -			default: -				LOG.info("HIT DEFAULT"); - -			break; -		} -	}  	public void validateInput(JobConf job) throws IOException {  		// expecting exactly one path @@ -575,45 +451,6 @@ public class HBaseInputFormat_SINGLE implements  		}  	} -	/* Getters & Setters */ -	private HTable getHTable() { -		return this.table; -	} - -	private void setHTable(HTable ht) { -		this.table = ht; -	} - -	private void setInputColumns(byte[][] ic) { -		this.inputColumns = ic; -	} - -	private void setJobProp(JobConf job, String key, String value) { -		if (job.get(key) != null) -			throw new RuntimeException(String.format( -					"Job Conf already has key [%s] with value [%s]", key, -					job.get(key))); -		job.set(key, value); -	} - -	private String getJobProp(JobConf job, String key) { -		return job.get(key); -	} - -	public static void setTableName(JobConf job, String tableName) { -		// Make sure that table has not been set before -		String oldTableName = getTableName(job); -		if (oldTableName != null) { -			throw new RuntimeException("table name already set to: '" -					+ oldTableName + "'"); -		} - -		job.set(INPUT_TABLE, tableName); -	} - -	public static String getTableName(JobConf job) { -		return job.get(INPUT_TABLE); -	}  	protected boolean includeRegionInSplit(final byte[] startKey,  			final byte[] endKey) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java new file mode 100644 index 0000000..eadb57e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:24 + * To change this template use File | Settings | File Templates. + */ +public class HBaseInputFormatRegional extends HBaseInputFormatBase { +    private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); +    private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); + + +    @Override +    public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { +        granular.configure(job); +        HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits); + +        HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits ); + +        if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL"); + +        LOG.info("GRANULAR => " + gSplits); +        LOG.info("REGIONAL => " + splits); + +        return splits; +    } + +    @Override +    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { +        if (!(inputSplit instanceof HBaseTableSplitRegional)) +            throw new IOException("Table Split is not type HBaseTableSplitRegional"); + +        LOG.info("REGIONAL SPLIT -> " + inputSplit); + +        HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit; + +        HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional(); + +        HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + +        trr.setHTable(this.table); +        trr.setInputColumns(this.inputColumns); +        trr.setRowFilter(this.rowFilter); + +        trr.init(tSplit); + +        return trr; +    } + +    private HBaseTableSplitRegional[] convertToMultiSplitArray( +            HBaseTableSplitGranular[] splits) throws IOException { + +        if (splits == null) +            throw new IOException("The list of splits is null => " + splits); + +        HashMap<String, HBaseTableSplitRegional> regionSplits = new HashMap<String, HBaseTableSplitRegional>(); + +        for (HBaseTableSplitGranular hbt : splits) { +            HBaseTableSplitRegional mis = null; +            if (regionSplits.containsKey(hbt.getRegionLocation())) { +                mis = regionSplits.get(hbt.getRegionLocation()); +            } else { +                regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional( +                        hbt.getRegionLocation())); +                mis = regionSplits.get(hbt.getRegionLocation()); +            } + +            mis.addSplit(hbt); +            regionSplits.put(hbt.getRegionLocation(), mis); +        } + +//        for(String region : regionSplits.keySet() ) { +//            regionSplits.get(region) +//        } + +        Collection<HBaseTableSplitRegional> outVals = regionSplits.values(); + +        LOG.debug("".format("Returning array of splits : %s", outVals)); + +        if (outVals == null) +            throw new IOException("The list of multi input splits were null"); + +        return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]); +    } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java deleted file mode 100644 index 02e7f7b..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java +++ /dev/null @@ -1,111 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang.SerializationUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -public class HBaseMultiInputSplit implements InputSplit, -		Comparable<HBaseMultiInputSplit>, Serializable { - -	private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class); - -	private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - -	private String regionLocation = null; - -	/** default constructor */ -	private HBaseMultiInputSplit() { - -	} - -	public HBaseMultiInputSplit(String regionLocation) { -		this.regionLocation = regionLocation; -	} - -	/** @return the region's hostname */ -	public String getRegionLocation() { -		LOG.debug("REGION GETTER : " + regionLocation); - -		return this.regionLocation; -	} - -	@Override -	public void readFields(DataInput in) throws IOException { -		LOG.debug("READ ME : " + in.toString()); - -		int s = Bytes.toInt(Bytes.readByteArray(in)); - -		for (int i = 0; i < s; i++) { -			HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils -					.deserialize(Bytes.readByteArray(in)); -			splits.add(hbts); -		} - -		LOG.debug("READ and CREATED : " + this); -	} - -	@Override -	public void write(DataOutput out) throws IOException { -		LOG.debug("WRITE : " + this); - -		Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); - -		for (HBaseTableSplit hbts : splits) { -			Bytes.writeByteArray(out, SerializationUtils.serialize(hbts)); -		} - -		LOG.debug("WROTE : " + out.toString()); -	} - -	@Override -	public String toString() { -		StringBuffer str = new StringBuffer(); -		str.append("HBaseMultiSplit : "); - -		for (HBaseTableSplit hbt : splits) { -			str.append(" [" + hbt.toString() + "]"); -		} - -		return str.toString(); -	} - -	@Override -	public int compareTo(HBaseMultiInputSplit o) { -		// TODO: Make this comparison better -		return (splits.size() - o.splits.size()); -	} - -	@Override -	public long getLength() throws IOException { -		return splits.size(); -	} - -	@Override -	public String[] getLocations() throws IOException { -		LOG.debug("REGION ARRAY : " + regionLocation); - -		return new String[] { this.regionLocation }; -	} - -	public void addSplit(HBaseTableSplit hbt) throws IOException { -		if (hbt.getRegionLocation().equals(regionLocation)) -			splits.add(hbt); -		else -			throw new IOException("HBaseTableSplit Region Location " -					+ hbt.getRegionLocation() -					+ " does NOT match MultiSplit Region Location " + regionLocation); -	} - -	public List<HBaseTableSplit> getSplits() { -		return splits; -	} -}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java deleted file mode 100644 index 5d7dbdd..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ /dev/null @@ -1,609 +0,0 @@ -package parallelai.spyglass.hbase; - -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.Vector; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements -		RecordReader<ImmutableBytesWritable, Result> { - -	static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - -	private byte[] startRow; -	private byte[] endRow; -	private byte[] lastSuccessfulRow; -	private TreeSet<String> keyList; -	private SourceMode sourceMode; -	private Filter trrRowFilter; -	private ResultScanner scanner; -	private HTable htable; -	private byte[][] trrInputColumns; -	private long timestamp; -	private int rowcount; -	private boolean logScannerActivity = false; -	private int logPerRowCount = 100; -	private boolean endRowInclusive = true; -	private int versions = 1; -	private boolean useSalt = false; - -	private HBaseMultiInputSplit multiSplit = null; -	private List<HBaseTableSplit> allSplits = null; - -	private HBaseRecordReader() { -	} - -	public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException { -		multiSplit = mSplit; - -		LOG.info("Creatin Multi Split for region location : " -				+ multiSplit.getRegionLocation()); - -		allSplits = multiSplit.getSplits(); -	} - -	public boolean setNextSplit() throws IOException { -		if (allSplits.size() > 0) { -			setSplitValue(allSplits.remove(0)); -			return true; -		} else { -			return false; -		} -	} - -	private void setSplitValue(HBaseTableSplit tSplit) throws IOException { -		switch (tSplit.getSourceMode()) { -			case SCAN_ALL: -			case SCAN_RANGE: { -				LOG.debug(String.format( -						"For split [%s] we have start key (%s) and stop key (%s)", -						tSplit, tSplit.getStartRow(), tSplit.getEndRow())); - -				setStartRow(tSplit.getStartRow()); -				setEndRow(tSplit.getEndRow()); -				setEndRowInclusive(tSplit.getEndRowInclusive()); -			} - -			break; - -			case GET_LIST: { -				LOG.debug(String.format("For split [%s] we have key list (%s)", -						tSplit, tSplit.getKeyList())); - -				setKeyList(tSplit.getKeyList()); -				setVersions(tSplit.getVersions()); -			} - -			break; - -			case EMPTY: -				LOG.info("EMPTY split. Doing nothing."); -			break; - -			default: -				throw new IOException("Unknown source mode : " -						+ tSplit.getSourceMode()); -		} - -		setSourceMode(tSplit.getSourceMode()); - -		init(); -	} - -	/** -	 * Restart from survivable exceptions by creating a new scanner. -	 *  -	 * @param firstRow -	 * @throws IOException -	 */ -	private void restartRangeScan(byte[] firstRow) throws IOException { -		Scan currentScan; -		if ((endRow != null) && (endRow.length > 0)) { -			if (trrRowFilter != null) { -				Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, -						new byte[] { 0 }) : endRow)); - -				TableInputFormat.addColumns(scan, trrInputColumns); -				scan.setFilter(trrRowFilter); -				scan.setCacheBlocks(false); -				this.scanner = this.htable.getScanner(scan); -				currentScan = scan; -			} else { -				LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) -						+ ", endRow: " + Bytes.toString(endRow)); -				Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, -						new byte[] { 0 }) : endRow)); -				TableInputFormat.addColumns(scan, trrInputColumns); -				this.scanner = this.htable.getScanner(scan); -				currentScan = scan; -			} -		} else { -			LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) -					+ ", no endRow"); - -			Scan scan = new Scan(firstRow); -			TableInputFormat.addColumns(scan, trrInputColumns); -			scan.setFilter(trrRowFilter); -			this.scanner = this.htable.getScanner(scan); -			currentScan = scan; -		} -		if (logScannerActivity) { -			LOG.debug("Current scan=" + currentScan.toString()); -			timestamp = System.currentTimeMillis(); -			rowcount = 0; -		} -	} - -	public TreeSet<String> getKeyList() { -		return keyList; -	} - -	private void setKeyList(TreeSet<String> keyList) { -		this.keyList = keyList; -	} - -	private void setVersions(int versions) { -		this.versions = versions; -	} - -	public void setUseSalt(boolean useSalt) { -		this.useSalt = useSalt; -	} - -	public SourceMode getSourceMode() { -		return sourceMode; -	} - -	private void setSourceMode(SourceMode sourceMode) { -		this.sourceMode = sourceMode; -	} - -	public byte[] getEndRow() { -		return endRow; -	} - -	private void setEndRowInclusive(boolean isInclusive) { -		endRowInclusive = isInclusive; -	} - -	public boolean getEndRowInclusive() { -		return endRowInclusive; -	} - -	private byte[] nextKey = null; -	private Vector<List<KeyValue>> resultVector = null; -	Map<Long, List<KeyValue>> keyValueMap = null; - -	/** -	 * Build the scanner. Not done in constructor to allow for extension. -	 *  -	 * @throws IOException -	 */ -	private void init() throws IOException { -		switch (sourceMode) { -			case SCAN_ALL: -			case SCAN_RANGE: -				restartRangeScan(startRow); -			break; - -			case GET_LIST: -				nextKey = Bytes.toBytes(keyList.pollFirst()); -			break; - -			case EMPTY: -				LOG.info("EMPTY mode. Do nothing"); -			break; - -			default: -				throw new IOException(" Unknown source mode : " + sourceMode); -		} -	} - -	byte[] getStartRow() { -		return this.startRow; -	} - -	/** -	 * @param htable -	 *           the {@link HTable} to scan. -	 */ -	public void setHTable(HTable htable) { -		Configuration conf = htable.getConfiguration(); -		logScannerActivity = conf.getBoolean( -				ScannerCallable.LOG_SCANNER_ACTIVITY, false); -		logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); -		this.htable = htable; -	} - -	/** -	 * @param inputColumns -	 *           the columns to be placed in {@link Result}. -	 */ -	public void setInputColumns(final byte[][] inputColumns) { -		this.trrInputColumns = inputColumns; -	} - -	/** -	 * @param startRow -	 *           the first row in the split -	 */ -	private void setStartRow(final byte[] startRow) { -		this.startRow = startRow; -	} - -	/** -	 *  -	 * @param endRow -	 *           the last row in the split -	 */ -	private void setEndRow(final byte[] endRow) { -		this.endRow = endRow; -	} - -	/** -	 * @param rowFilter -	 *           the {@link Filter} to be used. -	 */ -	public void setRowFilter(Filter rowFilter) { -		this.trrRowFilter = rowFilter; -	} - -	@Override -	public void close() { -		if (this.scanner != null) -			this.scanner.close(); -	} - -	/** -	 * @return ImmutableBytesWritable -	 *  -	 * @see org.apache.hadoop.mapred.RecordReader#createKey() -	 */ -	@Override -	public ImmutableBytesWritable createKey() { -		return new ImmutableBytesWritable(); -	} - -	/** -	 * @return RowResult -	 *  -	 * @see org.apache.hadoop.mapred.RecordReader#createValue() -	 */ -	@Override -	public Result createValue() { -		return new Result(); -	} - -	@Override -	public long getPos() { -		// This should be the ordinal tuple in the range; -		// not clear how to calculate... -		return 0; -	} - -	@Override -	public float getProgress() { -		// Depends on the total number of tuples and getPos -		return 0; -	} - -	/** -	 * @param key -	 *           HStoreKey as input key. -	 * @param value -	 *           MapWritable as input value -	 * @return true if there was more data -	 * @throws IOException -	 */ -	@Override -	public boolean next(ImmutableBytesWritable key, Result value) -			throws IOException { - -		switch (sourceMode) { -			case SCAN_ALL: -			case SCAN_RANGE: { - -				Result result; -				try { -					try { -						result = this.scanner.next(); -						if (logScannerActivity) { -							rowcount++; -							if (rowcount >= logPerRowCount) { -								long now = System.currentTimeMillis(); -								LOG.debug("Mapper took " + (now - timestamp) -										+ "ms to process " + rowcount + " rows"); -								timestamp = now; -								rowcount = 0; -							} -						} -					} catch (IOException e) { -						// try to handle all IOExceptions by restarting -						// the scanner, if the second call fails, it will be rethrown -						LOG.debug("recovered from " -								+ StringUtils.stringifyException(e)); -						if (lastSuccessfulRow == null) { -							LOG.warn("We are restarting the first next() invocation," -									+ " if your mapper has restarted a few other times like this" -									+ " then you should consider killing this job and investigate" -									+ " why it's taking so long."); -						} -						if (lastSuccessfulRow == null) { -							restartRangeScan(startRow); -						} else { -							restartRangeScan(lastSuccessfulRow); -							this.scanner.next(); // skip presumed already mapped row -						} -						result = this.scanner.next(); -					} - -					if (result != null && result.size() > 0) { -						if (useSalt) { -							key.set(HBaseSalter.delSaltPrefix(result.getRow())); -						} else { -							key.set(result.getRow()); -						} - -						lastSuccessfulRow = key.get(); -						Writables.copyWritable(result, value); -						return true; -					} -					return setNextSplit(); -				} catch (IOException ioe) { -					if (logScannerActivity) { -						long now = System.currentTimeMillis(); -						LOG.debug("Mapper took " + (now - timestamp) -								+ "ms to process " + rowcount + " rows"); -						LOG.debug(ioe); -						String lastRow = lastSuccessfulRow == null ? "null" : Bytes -								.toStringBinary(lastSuccessfulRow); -						LOG.debug("lastSuccessfulRow=" + lastRow); -					} -					throw ioe; -				} -			} - -			case GET_LIST: { -				LOG.debug(String.format("INTO next with GET LIST and Key (%s)", -						Bytes.toString(nextKey))); - -				if (versions == 1) { -					if (nextKey != null) { -						LOG.debug(String.format("Processing Key (%s)", -								Bytes.toString(nextKey))); - -						Get theGet = new Get(nextKey); -						theGet.setMaxVersions(versions); - -						Result result = this.htable.get(theGet); - -						if (result != null && (!result.isEmpty())) { -							LOG.debug(String.format( -									"Key (%s), Version (%s), Got Result (%s)", -									Bytes.toString(nextKey), versions, result)); - -							if (keyList != null || !keyList.isEmpty()) { -								String newKey = keyList.pollFirst(); -								LOG.debug("New Key => " + newKey); -								nextKey = (newKey == null || newKey.length() == 0) ? null -										: Bytes.toBytes(newKey); -							} else { -								nextKey = null; -							} - -							LOG.debug(String.format("=> Picked a new Key (%s)", -									Bytes.toString(nextKey))); - -							// Write the result -							if (useSalt) { -								key.set(HBaseSalter.delSaltPrefix(result.getRow())); -							} else { -								key.set(result.getRow()); -							} -							lastSuccessfulRow = key.get(); -							Writables.copyWritable(result, value); - -							return true; -						} else { -							LOG.debug(" Key (" + Bytes.toString(nextKey) -									+ ") return an EMPTY result. Get (" + theGet.getId() -									+ ")"); // alg0 - -							String newKey; -							while ((newKey = keyList.pollFirst()) != null) { -								LOG.debug("WHILE NEXT Key => " + newKey); - -								nextKey = (newKey == null || newKey.length() == 0) ? null -										: Bytes.toBytes(newKey); - -								if (nextKey == null) { -									LOG.error("BOMB! BOMB! BOMB!"); -									continue; -								} - -								if (!this.htable.exists(new Get(nextKey))) { -									LOG.debug(String.format( -											"Key (%s) Does not exist in Table (%s)", -											Bytes.toString(nextKey), -											Bytes.toString(this.htable.getTableName()))); -									continue; -								} else { -									break; -								} -							} - -							nextKey = (newKey == null || newKey.length() == 0) ? null -									: Bytes.toBytes(newKey); - -							LOG.debug("Final New Key => " + Bytes.toString(nextKey)); - -							return next(key, value); -						} -					} else { -						// Nothig left. return false -						return setNextSplit(); -					} -				} else { -					if (resultVector != null && resultVector.size() != 0) { -						LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", -								versions, resultVector)); - -						List<KeyValue> resultKeyValue = resultVector -								.remove(resultVector.size() - 1); -						Result result = new Result(resultKeyValue); - -						LOG.debug(String.format("+ Version (%s), Got Result <%s>", -								versions, result)); - -						if (useSalt) { -							key.set(HBaseSalter.delSaltPrefix(result.getRow())); -						} else { -							key.set(result.getRow()); -						} -						lastSuccessfulRow = key.get(); -						Writables.copyWritable(result, value); - -						return true; -					} else { -						if (nextKey != null) { -							LOG.debug(String.format("+ Processing Key (%s)", -									Bytes.toString(nextKey))); - -							Get theGet = new Get(nextKey); -							theGet.setMaxVersions(versions); - -							Result resultAll = this.htable.get(theGet); - -							if (resultAll != null && (!resultAll.isEmpty())) { -								List<KeyValue> keyValeList = resultAll.list(); - -								keyValueMap = new HashMap<Long, List<KeyValue>>(); - -								LOG.debug(String.format( -										"+ Key (%s) Versions (%s) Val;ute map <%s>", -										Bytes.toString(nextKey), versions, keyValueMap)); - -								for (KeyValue keyValue : keyValeList) { -									long version = keyValue.getTimestamp(); - -									if (keyValueMap.containsKey(new Long(version))) { -										List<KeyValue> keyValueTempList = keyValueMap -												.get(new Long(version)); -										if (keyValueTempList == null) { -											keyValueTempList = new ArrayList<KeyValue>(); -										} -										keyValueTempList.add(keyValue); -									} else { -										List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); -										keyValueMap.put(new Long(version), -												keyValueTempList); -										keyValueTempList.add(keyValue); -									} -								} - -								resultVector = new Vector<List<KeyValue>>(); -								resultVector.addAll(keyValueMap.values()); - -								List<KeyValue> resultKeyValue = resultVector -										.remove(resultVector.size() - 1); - -								Result result = new Result(resultKeyValue); - -								LOG.debug(String.format( -										"+ Version (%s), Got Result (%s)", versions, -										result)); - -								String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - -								System.out.println("+ New Key => " + newKey); -								nextKey = (newKey == null || newKey.length() == 0) ? null -										: Bytes.toBytes(newKey); - -								if (useSalt) { -									key.set(HBaseSalter.delSaltPrefix(result.getRow())); -								} else { -									key.set(result.getRow()); -								} -								lastSuccessfulRow = key.get(); -								Writables.copyWritable(result, value); -								return true; -							} else { -								LOG.debug(String.format( -										"+ Key (%s) return an EMPTY result. Get (%s)", -										Bytes.toString(nextKey), theGet.getId())); // alg0 - -								String newKey; - -								while ((newKey = keyList.pollFirst()) != null) { -									LOG.debug("+ WHILE NEXT Key => " + newKey); - -									nextKey = (newKey == null || newKey.length() == 0) ? null -											: Bytes.toBytes(newKey); - -									if (nextKey == null) { -										LOG.error("+ BOMB! BOMB! BOMB!"); -										continue; -									} - -									if (!this.htable.exists(new Get(nextKey))) { -										LOG.debug(String.format( -												"+ Key (%s) Does not exist in Table (%s)", -												Bytes.toString(nextKey), -												Bytes.toString(this.htable.getTableName()))); -										continue; -									} else { -										break; -									} -								} - -								nextKey = (newKey == null || newKey.length() == 0) ? null -										: Bytes.toBytes(newKey); - -								LOG.debug("+ Final New Key => " -										+ Bytes.toString(nextKey)); - -								return next(key, value); -							} - -						} else { -							return setNextSplit(); -						} -					} -				} -			} - -			case EMPTY: { -				LOG.info("GOT an empty Split"); -				return setNextSplit(); -			} - -			default: -				throw new IOException("Unknown source mode : " + sourceMode); -		} -	} -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java new file mode 100644 index 0000000..37858ad --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -0,0 +1,140 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.RecordReader; + +import java.util.TreeSet; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 15:42 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseRecordReaderBase implements +        RecordReader<ImmutableBytesWritable, Result> { + +    protected TreeSet<String> keyList; +    protected HBaseConstants.SourceMode sourceMode; +    protected boolean endRowInclusive = true; +    protected int versions = 1; +    protected boolean useSalt = false; + +    protected byte[] startRow; +    protected byte[] endRow; + +    protected HTable htable; +    protected byte[][] trrInputColumns; + +    protected Filter trrRowFilter; + +    protected boolean logScannerActivity = false; +    protected int logPerRowCount = 100; + +    @Override +    public String toString() { +        StringBuffer sbuf = new StringBuffer(); + +        sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] endRowInc [%s] ", +                Bytes.toString(startRow), Bytes.toString(endRow), endRowInclusive)); +        sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ", +                sourceMode, useSalt, versions)); + +        return sbuf.toString(); +    } + +    byte[] getStartRow() { +        return this.startRow; +    } + +    /** +     * @param htable +     *          the {@link org.apache.hadoop.hbase.client.HTable} to scan. +     */ +    public void setHTable(HTable htable) { +        Configuration conf = htable.getConfiguration(); +        logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, +                false); +        logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); +        this.htable = htable; +    } + +    /** +     * @param inputColumns +     *          the columns to be placed in {@link Result}. +     */ +    public void setInputColumns(final byte[][] inputColumns) { +        this.trrInputColumns = inputColumns; +    } + +    /** +     * @param startRow +     *          the first row in the split +     */ +    public void setStartRow(final byte[] startRow) { +        this.startRow = startRow; +    } + +    /** +     * +     * @param endRow +     *          the last row in the split +     */ +    public void setEndRow(final byte[] endRow) { +        this.endRow = endRow; +    } + +    /** +     * @param rowFilter +     *          the {@link org.apache.hadoop.hbase.filter.Filter} to be used. +     */ +    public void setRowFilter(Filter rowFilter) { +        this.trrRowFilter = rowFilter; +    } + +    public TreeSet<String> getKeyList() { +        return keyList; +    } + +    public void setKeyList(TreeSet<String> keyList) { +        this.keyList = keyList; +    } + +    public void setVersions(int versions) { +        this.versions = versions; +    } + +    public void setUseSalt(boolean useSalt) { +        this.useSalt = useSalt; +    } + +    public HBaseConstants.SourceMode getSourceMode() { +        return sourceMode; +    } + +    public void setSourceMode(HBaseConstants.SourceMode sourceMode) { +        this.sourceMode = sourceMode; +    } + +    public byte[] getEndRow() { +        return endRow; +    } + +    public void setEndRowInclusive(boolean isInclusive) { +        endRowInclusive = isInclusive; +    } + +    public boolean getEndRowInclusive() { +        return endRowInclusive; +    } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 5eafc78..6c28d9f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -30,29 +30,29 @@ import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseRecordReader_SINGLE implements -    RecordReader<ImmutableBytesWritable, Result> { +public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { -  static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class); +  static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); -  private byte[] startRow; -  private byte[] endRow;    private byte[] lastSuccessfulRow; -  private TreeSet<String> keyList; -  private SourceMode sourceMode; -  private Filter trrRowFilter;    private ResultScanner scanner; -  private HTable htable; -  private byte[][] trrInputColumns;    private long timestamp;    private int rowcount; -  private boolean logScannerActivity = false; -  private int logPerRowCount = 100; -  private boolean endRowInclusive = true; -  private int versions = 1; -  private boolean useSalt = false; -  /** +    @Override +    public String toString() { +        StringBuffer sbuf = new StringBuffer(); + +        sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", +                Bytes.toString(startRow), Bytes.toString(endRow), Bytes.toString(lastSuccessfulRow), Bytes.toString(nextKey), endRowInclusive, rowcount)); +        sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ", +                sourceMode, useSalt, versions)); + +        return sbuf.toString(); +    } + + +    /**     * Restart from survivable exceptions by creating a new scanner.     *      * @param firstRow @@ -96,41 +96,6 @@ public class HBaseRecordReader_SINGLE implements      }    } -  public TreeSet<String> getKeyList() { -    return keyList; -  } - -  public void setKeyList(TreeSet<String> keyList) { -    this.keyList = keyList; -  } - -  public void setVersions(int versions) { -    this.versions = versions; -  } -   -  public void setUseSalt(boolean useSalt) { -    this.useSalt = useSalt; -  } - -  public SourceMode getSourceMode() { -    return sourceMode; -  } - -  public void setSourceMode(SourceMode sourceMode) { -    this.sourceMode = sourceMode; -  } - -  public byte[] getEndRow() { -    return endRow; -  } - -  public void setEndRowInclusive(boolean isInclusive) { -    endRowInclusive = isInclusive; -  } - -  public boolean getEndRowInclusive() { -    return endRowInclusive; -  }    private byte[] nextKey = null;    private Vector<List<KeyValue>> resultVector = null; @@ -157,55 +122,6 @@ public class HBaseRecordReader_SINGLE implements      }    } -  byte[] getStartRow() { -    return this.startRow; -  } - -  /** -   * @param htable -   *          the {@link HTable} to scan. -   */ -  public void setHTable(HTable htable) { -    Configuration conf = htable.getConfiguration(); -    logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, -        false); -    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); -    this.htable = htable; -  } - -  /** -   * @param inputColumns -   *          the columns to be placed in {@link Result}. -   */ -  public void setInputColumns(final byte[][] inputColumns) { -    this.trrInputColumns = inputColumns; -  } - -  /** -   * @param startRow -   *          the first row in the split -   */ -  public void setStartRow(final byte[] startRow) { -    this.startRow = startRow; -  } - -  /** -   *  -   * @param endRow -   *          the last row in the split -   */ -  public void setEndRow(final byte[] endRow) { -    this.endRow = endRow; -  } - -  /** -   * @param rowFilter -   *          the {@link Filter} to be used. -   */ -  public void setRowFilter(Filter rowFilter) { -    this.trrRowFilter = rowFilter; -  } -    @Override    public void close() {      if (this.scanner != null) @@ -450,7 +366,6 @@ public class HBaseRecordReader_SINGLE implements                String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// -              System.out.println("+ New Key => " + newKey);                nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes                    .toBytes(newKey); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java new file mode 100644 index 0000000..e2b1ec8 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -0,0 +1,124 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { + +	static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class); + + +    private byte[] nextKey = null; +    private Vector<List<KeyValue>> resultVector = null; +    Map<Long, List<KeyValue>> keyValueMap = null; + +    private HBaseTableSplitRegional multiSplit = null; +	private HBaseTableSplitGranular currentSplit = null; + +    private HBaseRecordReaderGranular currentRecordReader = null; + +	public void init(HBaseTableSplitRegional mSplit) throws IOException { +		multiSplit = mSplit; + +		LOG.debug("Creating Multi Split for region location : " +                + multiSplit.getRegionLocation() + " -> " + multiSplit); + +        setNextSplit(); +	} + +	public boolean setNextSplit() throws IOException { +        currentSplit = multiSplit.getNextSplit(); + +        LOG.debug("IN: setNextSplit : " + currentSplit ); + +		if( currentSplit != null ) { +			setSplitValue(currentSplit); +			return true; +		} else { +			return false; +		} +	} + +    private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException { +        HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + +        trr.setHTable(htable); +        trr.setInputColumns(trrInputColumns); +        trr.setRowFilter(trrRowFilter); + +        trr.init(); +    } + +	private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException { +        LOG.debug("IN: setSplitValue : " + tSplit ); + +        if( currentRecordReader != null ) currentRecordReader.close(); + +        currentRecordReader = new HBaseRecordReaderGranular(); +        setRecordReaderParms(currentRecordReader, currentSplit); +	} + +    @Override +    public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException { +        boolean nextFlag = currentRecordReader.next(ibw, result); + +        while(nextFlag == false && multiSplit.hasMoreSplits() ) { +            setNextSplit(); +            nextFlag = currentRecordReader.next(ibw, result); +        } + +        return nextFlag; +    } + +    @Override +    public ImmutableBytesWritable createKey() { +        return currentRecordReader.createKey(); +    } + +    @Override +    public Result createValue() { +        return currentRecordReader.createValue(); +    } + +    @Override +    public long getPos() throws IOException { +        return currentRecordReader.getPos(); +    } + +    @Override +    public void close() throws IOException { +        currentRecordReader.close(); +    } + +    @Override +    public float getProgress() throws IOException { +        return currentRecordReader.getProgress(); +    } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 6766458..5bdf8cd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -68,7 +68,7 @@ public class HBaseSalter {  		  byte[] originalStartKey, byte[] originalStopKey,   		  byte[] regionStartKey, byte[] regionStopKey,   		  String prefixList) throws IOException { -	LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", +	LOG.debug("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)",  			Bytes.toString(originalStartKey),  			Bytes.toString(originalStopKey),  			Bytes.toString(regionStartKey), @@ -160,7 +160,7 @@ public class HBaseSalter {    }    private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { -      LOG.info("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)", +      LOG.debug("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)",                Bytes.toString(originalKey), prefixList, startPrefix, stopPrefix));        char[] prefixArray = prefixList.toCharArray(); @@ -172,13 +172,13 @@ public class HBaseSalter {  	  SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); -      LOG.info("".format("Prefix subset (%s)", subSet)); +      LOG.debug("".format("Prefix subset (%s)", subSet));  	  return getAllKeys(originalKey, subSet.toArray(new Byte[]{}));    }    public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { -    LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)", +    LOG.debug("".format("getAllKeys: OKEY (%s) PARRAY (%s)",                Bytes.toString(originalKey), prefixArray ));  	byte[][] keys = new byte[prefixArray.length][]; @@ -187,12 +187,6 @@ public class HBaseSalter {        keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey));      } -    for(int i = 0; i < keys.length; i ++) { -        for(int j = 0; j < keys[i].length; j++) { -            LOG.info("" + i + " : " + j + " : " + keys[i][j]); -        } -    } -      return keys;    } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index aa446c1..6f04f01 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -12,6 +12,7 @@  package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SplitType;  import cascading.flow.FlowProcess;  import cascading.scheme.Scheme;  import cascading.scheme.SinkCall; @@ -65,6 +66,8 @@ public class HBaseScheme  //  private transient byte[][] fields;    private boolean useSalt = false; + +  private SplitType splitType = SplitType.GRANULAR;    /** @@ -279,11 +282,31 @@ public class HBaseScheme    @Override    public void sourceConfInit(FlowProcess<JobConf> process,        Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -    conf.setInputFormat(HBaseInputFormat.class); -    String columns = getColumns(); -    LOG.debug("sourcing from columns: {}", columns); -    conf.set(HBaseInputFormat.COLUMN_LIST, columns); +      switch(splitType) { +          case GRANULAR: +          { +              conf.setInputFormat(HBaseInputFormatGranular.class); + +              String columns = getColumns(); +              LOG.debug("sourcing from columns: {}", columns); +              conf.set(HBaseInputFormatGranular.COLUMN_LIST, columns); +          } +          break; + +          case REGIONAL: +          { +              conf.setInputFormat(HBaseInputFormatRegional.class); + +              String columns = getColumns(); +              LOG.debug("sourcing from columns: {}", columns); +              conf.set(HBaseInputFormatRegional.COLUMN_LIST, columns); +          } +          break; + +          default: +              LOG.error("Unknown Split Type : " + splitType.toString()); +      }    }    private String getColumns() { @@ -345,4 +368,8 @@ public class HBaseScheme      result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0);      return result;    } + +  public void setInputSplitTye(SplitType sType) { +      this.splitType = sType; +  }  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java deleted file mode 100644 index 87b8f58..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ /dev/null @@ -1,219 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseTableSplit implements InputSplit, -		Comparable<HBaseTableSplit>, Serializable { - -	private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); - -	private byte[] m_tableName = null; -	private byte[] m_startRow = null; -	private byte[] m_endRow = null; -	private String m_regionLocation = null; -	private TreeSet<String> m_keyList = null; -	private SourceMode m_sourceMode = SourceMode.EMPTY; -	private boolean m_endRowInclusive = true; -	private int m_versions = 1; -	private boolean m_useSalt = false; - -	/** default constructor */ -	public HBaseTableSplit() { -		this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, -				HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); -	} - -	/** -	 * Constructor -	 *  -	 * @param tableName -	 * @param startRow -	 * @param endRow -	 * @param location -	 */ -	public HBaseTableSplit(final byte[] tableName, final byte[] startRow, -			final byte[] endRow, final String location, -			final SourceMode sourceMode, final boolean useSalt) { -		this.m_tableName = tableName; -		this.m_startRow = startRow; -		this.m_endRow = endRow; -		this.m_regionLocation = location; -		this.m_sourceMode = sourceMode; -		this.m_useSalt = useSalt; -	} - -	public HBaseTableSplit(final byte[] tableName, -			final TreeSet<String> keyList, int versions, final String location, -			final SourceMode sourceMode, final boolean useSalt) { -		this.m_tableName = tableName; -		this.m_keyList = keyList; -		this.m_versions = versions; -		this.m_sourceMode = sourceMode; -		this.m_regionLocation = location; -		this.m_useSalt = useSalt; -	} - -	/** @return table name */ -	public byte[] getTableName() { -		return this.m_tableName; -	} - -	/** @return starting row key */ -	public byte[] getStartRow() { -		return this.m_startRow; -	} - -	/** @return end row key */ -	public byte[] getEndRow() { -		return this.m_endRow; -	} - -	public boolean getEndRowInclusive() { -		return m_endRowInclusive; -	} - -	public void setEndRowInclusive(boolean isInclusive) { -		m_endRowInclusive = isInclusive; -	} - -	/** @return list of keys to get */ -	public TreeSet<String> getKeyList() { -		return m_keyList; -	} - -	public int getVersions() { -		return m_versions; -	} - -	/** @return get the source mode */ -	public SourceMode getSourceMode() { -		return m_sourceMode; -	} - -	public boolean getUseSalt() { -		return m_useSalt; -	} - -	/** @return the region's hostname */ -	public String getRegionLocation() { -		LOG.debug("REGION GETTER : " + m_regionLocation); - -		return this.m_regionLocation; -	} - -	public String[] getLocations() { -		LOG.debug("REGION ARRAY : " + m_regionLocation); - -		return new String[] { this.m_regionLocation }; -	} - -	@Override -	public long getLength() { -		// Not clear how to obtain this... seems to be used only for sorting -		// splits -		return 0; -	} - -	@Override -	public void readFields(DataInput in) throws IOException { -		LOG.debug("READ ME : " + in.toString()); - -		this.m_tableName = Bytes.readByteArray(in); -		this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); -		this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes -				.readByteArray(in))); -		this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); - -		switch (this.m_sourceMode) { -			case SCAN_RANGE: -				this.m_startRow = Bytes.readByteArray(in); -				this.m_endRow = Bytes.readByteArray(in); -				this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); -			break; - -			case GET_LIST: -				this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); -				this.m_keyList = new TreeSet<String>(); - -				int m = Bytes.toInt(Bytes.readByteArray(in)); - -				for (int i = 0; i < m; i++) { -					this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); -				} -			break; -		} - -		LOG.debug("READ and CREATED : " + this); -	} - -	@Override -	public void write(DataOutput out) throws IOException { -		LOG.debug("WRITE : " + this); - -		Bytes.writeByteArray(out, this.m_tableName); -		Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); -		Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); -		Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); - -		switch (this.m_sourceMode) { -			case SCAN_RANGE: -				Bytes.writeByteArray(out, this.m_startRow); -				Bytes.writeByteArray(out, this.m_endRow); -				Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); -			break; - -			case GET_LIST: -				Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); -				Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); - -				for (String k : this.m_keyList) { -					Bytes.writeByteArray(out, Bytes.toBytes(k)); -				} -			break; -		} - -		LOG.debug("WROTE : " + out.toString()); -	} - -	@Override -	public String toString() { -		return String -				.format( -						"Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", -						Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, -						Bytes.toString(m_startRow), Bytes.toString(m_endRow), -						(m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, -						m_useSalt); -	} - -	@Override -	public int compareTo(HBaseTableSplit o) { -		switch (m_sourceMode) { -			case SCAN_ALL: -			case SCAN_RANGE: -				return Bytes.compareTo(getStartRow(), o.getStartRow()); - -			case GET_LIST: -				return m_keyList.equals(o.getKeyList()) ? 0 : -1; - -			case EMPTY: -				return 0; - -			default: -				return -1; -		} - -	} -}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java new file mode 100644 index 0000000..2f6e7b5 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -0,0 +1,165 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.TreeSet; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 16:18 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseTableSplitBase implements InputSplit, +        Comparable<HBaseTableSplitBase>, Serializable { + +    private final Log LOG = LogFactory.getLog(HBaseTableSplitBase.class); + + +    protected byte[] m_tableName = null; +    protected byte[] m_startRow = null; +    protected byte[] m_endRow = null; +    protected String m_regionLocation = null; +    protected TreeSet<String> m_keyList = null; +    protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY; +    protected boolean m_endRowInclusive = true; +    protected int m_versions = 1; +    protected boolean m_useSalt = false; + + +    /** @return table name */ +    public byte[] getTableName() { +        return this.m_tableName; +    } + +    /** @return starting row key */ +    public byte[] getStartRow() { +        return this.m_startRow; +    } + +    /** @return end row key */ +    public byte[] getEndRow() { +        return this.m_endRow; +    } + +    public boolean getEndRowInclusive() { +        return m_endRowInclusive; +    } + +    public void setEndRowInclusive(boolean isInclusive) { +        m_endRowInclusive = isInclusive; +    } + +    /** @return list of keys to get */ +    public TreeSet<String> getKeyList() { +        return m_keyList; +    } + +    public int getVersions() { +        return m_versions; +    } + +    /** @return get the source mode */ +    public HBaseConstants.SourceMode getSourceMode() { +        return m_sourceMode; +    } + +    public boolean getUseSalt() { +        return m_useSalt; +    } + +    /** @return the region's hostname */ +    public String getRegionLocation() { +        LOG.debug("REGION GETTER : " + m_regionLocation); + +        return this.m_regionLocation; +    } + +    public String[] getLocations() { +        LOG.debug("REGION ARRAY : " + m_regionLocation); + +        return new String[] { this.m_regionLocation }; +    } + + +    public void copy(HBaseTableSplitBase that) { +        this.m_endRow = that.m_endRow; +        this.m_endRowInclusive = that.m_endRowInclusive; +        this.m_keyList = that.m_keyList; +        this.m_sourceMode = that.m_sourceMode; +        this.m_startRow = that.m_startRow; +        this.m_tableName = that.m_tableName; +        this.m_useSalt = that.m_useSalt; +        this.m_versions = that.m_versions; +    } + +    @Override +    public void readFields(DataInput in) throws IOException { +        LOG.debug("READ ME : " + in.toString()); + +        this.m_tableName = Bytes.readByteArray(in); +        this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); +        this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes +                .readByteArray(in))); +        this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); + +        switch (this.m_sourceMode) { +            case SCAN_RANGE: +                this.m_startRow = Bytes.readByteArray(in); +                this.m_endRow = Bytes.readByteArray(in); +                this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); +                break; + +            case GET_LIST: +                this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); +                this.m_keyList = new TreeSet<String>(); + +                int m = Bytes.toInt(Bytes.readByteArray(in)); + +                for (int i = 0; i < m; i++) { +                    this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); +                } +                break; +        } + +        LOG.debug("READ and CREATED : " + this); +    } + +    @Override +    public void write(DataOutput out) throws IOException { +        LOG.debug("WRITE : " + this); + +        Bytes.writeByteArray(out, this.m_tableName); +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); + +        switch (this.m_sourceMode) { +            case SCAN_RANGE: +                Bytes.writeByteArray(out, this.m_startRow); +                Bytes.writeByteArray(out, this.m_endRow); +                Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); +                break; + +            case GET_LIST: +                Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); +                Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + +                for (String k : this.m_keyList) { +                    Bytes.writeByteArray(out, Bytes.toBytes(k)); +                } +                break; +        } + +        LOG.debug("WROTE : " + out.toString()); +    } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java new file mode 100644 index 0000000..4de7153 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -0,0 +1,97 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseTableSplitGranular extends HBaseTableSplitBase { + +	private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); + +    /** default constructor */ +    public HBaseTableSplitGranular() { +        this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, +                HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false); +    } + +    /** +     * Constructor +     * +     * @param tableName +     * @param startRow +     * @param endRow +     * @param location +     */ +    public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow, +                               final byte[] endRow, final String location, +                               final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { +        this.m_tableName = tableName; +        this.m_startRow = startRow; +        this.m_endRow = endRow; +        this.m_regionLocation = location; +        this.m_sourceMode = sourceMode; +        this.m_useSalt = useSalt; +    } + +    public HBaseTableSplitGranular(final byte[] tableName, +                               final TreeSet<String> keyList, int versions, final String location, +                               final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { +        this.m_tableName = tableName; +        this.m_keyList = keyList; +        this.m_versions = versions; +        this.m_sourceMode = sourceMode; +        this.m_regionLocation = location; +        this.m_useSalt = useSalt; +    } + + +	@Override +	public long getLength() { +		// Not clear how to obtain this... seems to be used only for sorting +		// splits +		return 0; +	} + + +	@Override +	public String toString() { +		return String +				.format( +						"Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", +						Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, +						Bytes.toString(m_startRow), Bytes.toString(m_endRow), +						(m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, +						m_useSalt); +	} + +	@Override +	public int compareTo(HBaseTableSplitBase o) { +        if( ! (o instanceof HBaseTableSplitGranular) ) return -1; + +		switch (m_sourceMode) { +			case SCAN_ALL: +			case SCAN_RANGE: +				return Bytes.compareTo(getStartRow(), o.getStartRow()); + +			case GET_LIST: +				return m_keyList.equals(o.getKeyList()) ? 0 : -1; + +			case EMPTY: +				return 0; + +			default: +				return -1; +		} + +	} +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java new file mode 100644 index 0000000..1ebfa3d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -0,0 +1,127 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Vector; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +public class HBaseTableSplitRegional extends HBaseTableSplitBase { + +	private final Log LOG = LogFactory.getLog(HBaseTableSplitRegional.class); + +	private List<HBaseTableSplitGranular> splits = new Vector<HBaseTableSplitGranular>(); + +	/** default constructor */ +	private HBaseTableSplitRegional() { + +	} + +	public HBaseTableSplitRegional(String regionLocation) { +		this.m_regionLocation = regionLocation; +	} + +	@Override +	public void readFields(DataInput in) throws IOException { +		LOG.debug("REGIONAL READ ME : " + in.toString()); + +        super.readFields(in); + +		int s = Bytes.toInt(Bytes.readByteArray(in)); + +		for (int i = 0; i < s; i++) { +			HBaseTableSplitGranular hbts = new HBaseTableSplitGranular(); +            hbts.readFields(in); + +			splits.add(hbts); +		} + +		LOG.debug("REGIONAL READ and CREATED : " + this); +	} + +	@Override +	public void write(DataOutput out) throws IOException { +		LOG.debug("REGIONAL WRITE : " + this); + +        super.write(out); + +		Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); + +		for (HBaseTableSplitGranular hbts : splits) { +			hbts.write(out); +		} + +		LOG.debug("REGIONAL WROTE : " + out.toString()); +	} + +	@Override +	public String toString() { +		StringBuffer str = new StringBuffer(); +		str.append("HBaseTableSplitRegional : "); + +        str.append(super.toString()); + +        str.append(" GRANULAR = > "); + +		for (HBaseTableSplitGranular hbt : splits) { +			str.append(" [" + hbt.toString() + "]"); +		} + +		return str.toString(); +	} + +	@Override +	public int compareTo(HBaseTableSplitBase o) { +        if( ! (o instanceof HBaseTableSplitRegional) ) return -1; + +		return (splits.size() - ((HBaseTableSplitRegional)o).splits.size()); +	} + +	@Override +	public long getLength() throws IOException { +		return splits.size(); +	} + +	public void addSplit(HBaseTableSplitGranular hbt) throws IOException { +        LOG.debug("ADD Split : " + hbt); + +		if (hbt.getRegionLocation().equals(m_regionLocation)) { +			splits.add(hbt); +            this.copy(hbt); +        } else +			throw new IOException("HBaseTableSplitGranular Region Location " +					+ hbt.getRegionLocation() +					+ " does NOT match MultiSplit Region Location " + m_regionLocation); +	} + +//	public List<HBaseTableSplitGranular> getSplits() { +//		return splits; +//	} + +    public boolean hasMoreSplits() { +        splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; + +        return splitIterator.hasNext(); +    } + +    private Iterator<HBaseTableSplitGranular> splitIterator = null; + +    public HBaseTableSplitGranular getNextSplit() { +        splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; + +        if( splitIterator.hasNext() ) { +            return splitIterator.next(); +        } else { +            return null; +        } +    } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 07b5aa7..bfe6670 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,8 @@  package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; +  import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  import cascading.flow.FlowProcess;  import cascading.tap.SinkMode; @@ -63,6 +65,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {    /** Field tableName */    private String tableName; +  private SplitType splitType = SplitType.GRANULAR; +    /**     * Constructor HBaseTap creates a new HBaseTap instance.     * @@ -204,7 +208,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {        return true;      } -    LOG.info("creating hbase table: {}", tableName); +    LOG.info("Creating HBase Table: {}", tableName);      HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); @@ -256,8 +260,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {  //    process.getID();  //      //    super.getFullIdentifier(conf); -     -    HBaseInputFormat.setTableName(conf, tableName); + +    switch(splitType) { +        case GRANULAR: +            HBaseInputFormatGranular.setTableName(conf, tableName); +            break; + +        case REGIONAL: +            HBaseInputFormatRegional.setTableName(conf, tableName); +            break; + +        default: +            LOG.error("Unknown Split Type : " + splitType); +    }      for( SourceConfig sc : sourceConfigList) {        sc.configure(conf); @@ -266,6 +281,10 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      super.sourceConfInit(process, conf);    } +  public void setInputSplitType(SplitType sType) { +      this.splitType = sType; +  } +    @Override    public boolean equals(Object object) {      if (this == object) { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 7ff7860..c214e99 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -10,7 +10,7 @@ import com.twitter.scalding.Read  import com.twitter.scalding.Source  import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}  import cascading.scheme.{NullScheme, Scheme}  import cascading.tap.SinkMode  import cascading.tap.Tap @@ -40,11 +40,14 @@ case class HBaseSource(      versions: Int = 1,      useSalt: Boolean = false,      prefixList: String = null, -    sinkMode: SinkMode = SinkMode.UPDATE +    sinkMode: SinkMode = SinkMode.UPDATE, +    inputSplitType: SplitType = SplitType.GRANULAR    ) extends Source { -     -  override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) -    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +  val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) +  internalScheme.setInputSplitTye(inputSplitType) + +  override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]    // To enable local mode testing    val allFields = keyFields.append(valueFields.toArray) @@ -76,6 +79,8 @@ case class HBaseSource(              }              case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))            } + +          hbt.setInputSplitType(inputSplitType)            hbt.asInstanceOf[Tap[_,_,_]]          } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index a4e2d7a..d75ff7b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -1,9 +1,9 @@  package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} -import com.twitter.scalding.{IterableSource, Args, TextLine} +import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine}  import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}  import cascading.tuple.Fields  import org.apache.log4j.{Logger, Level} @@ -59,76 +59,221 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio    val quorum = args("quorum")    val sttKey = "01728" -  val stpKey = "01831" +  val stpKey = "03725"    val sttKeyP = "8_01728" -  val stpKeyP = "1_01831" +  val stpKeyP = "5_03725"    val listKey = List("01681", "01456") -  val listKeyP = List("1_01681", "6_01456") -   -//  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_ALL ).read -//          .fromBytesWritable( TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanAllNoSalt01")) - +  val listKeyP = List("0_01681", "6_01456") +  val noSttKey = "9999990" +  val noStpKey = "9999999" +  val noSttKeyP = "9_9999990" +  val noStpKeyP = "9_9999999" +  val noListKey = List("0123456", "6543210") +  val noListKeyP = List("6_0123456", "0_6543210") + +  val splitType =  if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR + +  val testName01 = "Scan All with NO useSalt" +  val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) +  val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.SCAN_ALL, +          inputSplitType = splitType ).read +          .fromBytesWritable( TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanAllNoSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName02 = "Scan All with useSalt=true"    val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,            TABLE_SCHEMA.tail.map((x: Symbol) => "data"),            TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read -//        .fromBytesWritable( TABLE_SCHEMA ) -  .write(TextLine("saltTesting/ScanAllPlusSalt01")) - -//  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read -//        .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanRangeNoSalt01")) -// -//  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read -//        .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanRangePlusSalt01")) -// -//  val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read -//          .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/GetListNoSalt01")) -// -//  val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read -//          .fromBytesWritable(TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/GetListPlusSalt01")) -// -//  val hbase07 = -//      new HBaseSource( "_TEST.SALT.03", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -//  .read -//  .fromBytesWritable( TABLE_SCHEMA ) -//  .write(TextLine("saltTesting/ScanRangePlusSalt10")) -//  .toBytesWritable( TABLE_SCHEMA ) -//  .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          useSalt = true )) -// -//  val hbase08 = -//      new HBaseSource( "_TEST.SALT.01", quorum, 'key, -//          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -//          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -//          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -//  .read -//  .fromBytesWritable('*) -//  .write(TextLine("saltTesting/ScanRangePlusSalt03")) +          sourceMode = SourceMode.SCAN_ALL, useSalt = true, +          inputSplitType = splitType).read +          .fromBytesWritable( TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanAllPlusSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName03 = "Scan Range with NO useSalt" +  val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) +  val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanRangePlusSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName04 = "Scan Range with useSalt=true" +  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/ScanRangeNoSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + +  val testName05 = "Get List with NO useSalt" +  val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) +  val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/GetListPlusSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName06 = "Get List with useSalt=true" +  val hbase06 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +          sourceMode = SourceMode.GET_LIST, keyList = listKeyP, +          inputSplitType = splitType).read +          .fromBytesWritable(TABLE_SCHEMA ) +          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +          .project('testData) +          .write(TextLine("saltTesting/GetListNoSalt01")) +          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName08 = "Scan Range NO RESULTS" +  val hbase08 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKey, stopKey = noStpKey, useSalt = true, prefixList = prefix, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/ScanRangePlusSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName09 = "Scan Range NO RESULT with useSalt=true" +  val hbase09 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKeyP, stopKey = noStpKeyP, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/ScanRangeNoSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + +  val testName10 = "Get List NO RESULT" +  val hbase10 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.GET_LIST, keyList = noListKey, useSalt = true, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/GetListPlusSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + +  val testName11 = "Get List NO RESULT with useSalt=true" +  val hbase11 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +    TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +    TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +    sourceMode = SourceMode.GET_LIST, keyList = noListKeyP, +    inputSplitType = splitType).read +    .fromBytesWritable(TABLE_SCHEMA ) +    .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +    .project('testData) +    .write(TextLine("saltTesting/GetListNoSaltNoRes01")) +    .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + +//  ( +////      getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ +////      getTestResultPipe(getExpectedPipe(list01), hbase02, testName02) ++ +//      getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++ +//      getTestResultPipe(getExpectedPipe(list03), hbase04, testName03) ++ +//      getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++ +//      getTestResultPipe(getExpectedPipe(list05), hbase06, testName06) ++ +//      assertPipeIsEmpty(hbase08, testName08) ++ +//      assertPipeIsEmpty(hbase09, testName09) ++ +//      assertPipeIsEmpty(hbase10, testName10) ++ +//      assertPipeIsEmpty(hbase11, testName11) +//    ).groupAll { group => +//    group.sortBy('testName) +//  } +//    .write(Tsv("saltTesting/FinalTestResults")) + +  /** +   * We assume the pipe is empty +   * +   * We concatenate with a header - if the resulting size is 1 +   * then the original size was 0 - then the pipe was empty :) +   * +   * The result is then returned in a Pipe +   */ +  def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = { +    val headerPipe = IterableSource(List(testName), 'testData) +    val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => +      group.size('size) +    } +      .project('size) + +    val result = +      concatenation +        .mapTo('size -> ('testName, 'result, 'expectedData, 'testData)) { x:String => { +        if (x == "1") { +          (testName, "Success", "", "") +        } else { +          (testName, "Test Failed", "", "") +        } +      } +      } + +    result +  } + +  /** +   * Methods receives 2 pipes - and projects the results of testing +   * +   * expectedPipe  should have a column 'expecteddata +   * realHBasePipe should have a column 'hbasedata +   */ +  def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { +    val results = expectedPipe.insert('testName , testName) +      .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) +      .map(('expectedData, 'testData)->'result) { x:(String,String) => +      if (x._1.equals(x._2)) +        "Success" +      else +        "Test Failed" +    } +      .project('testName, 'result, 'expectedData, 'testData) +    results +  } + +  /** +   * +   */ +  def getExpectedPipe ( expectedList: List[(String,String, String)]) : Pipe = { +      IterableSource(expectedList, TABLE_SCHEMA) +        .map(('key, 'salted, 'unsalted) -> 'expectedData) {x: (String, String, String) => List(x._1, x._2, x._3)} +        .project('expectedData) +        .groupAll(group => group.toList[List[List[String]]]('expectedData -> 'expectedData)) +  } +  }  class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index a8de7d6..17bc873 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobRunner  import com.twitter.scalding.Args +import org.apache.log4j.{Level, Logger}  object HBaseSaltTesterRunner extends App { @@ -25,12 +26,18 @@ object HBaseSaltTesterRunner extends App {    val test = mArgs.getOrElse("test.data", "false").toBoolean    val delete = mArgs.getOrElse("delete.data", "false").toBoolean +  val isDebug = mArgs.getOrElse("debug", "false").toBoolean + +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + +    if( make ) {      JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName,        "--hdfs",        "--app.conf.path", appPath,        "--job.lib.path", jobLibPath, -      "--quorum", quorum +      "--quorum", quorum, +      "--debug", isDebug.toString      ))    } @@ -39,7 +46,9 @@ object HBaseSaltTesterRunner extends App {          "--hdfs",          "--app.conf.path", appPath,          "--job.lib.path", jobLibPath, -        "--quorum", quorum +        "--quorum", quorum, +        "--debug", isDebug.toString, +        "--regional", mArgs.getOrElse("regional", "false")      ))    } @@ -48,7 +57,8 @@ object HBaseSaltTesterRunner extends App {        "--hdfs",        "--app.conf.path", appPath,        "--job.lib.path", jobLibPath, -      "--quorum", quorum +      "--quorum", quorum, +      "--debug", isDebug.toString      ))    }  }
\ No newline at end of file | 
