diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-09-04 10:32:07 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-09-04 10:32:07 +0100 |
commit | 3501e241a2313cf49c371630cb6ebe0c3a47e991 (patch) | |
tree | 99b4e48c7590f94a4cbe8acf9ffbc036241ab737 /src | |
parent | 147a423b345ea365c22af48727c83ea4f31b948c (diff) | |
download | SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.tar.gz SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.zip |
Extensive changes to the underlying code base.
Fully tested and working support for region level spliting
Reduced number of mappers.
Diffstat (limited to 'src')
21 files changed, 1332 insertions, 1982 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java new file mode 100644 index 0000000..598a988 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -0,0 +1,53 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 17:25 + * To change this template use File | Settings | File Templates. + */ +public class HBaseConfigUtils { + static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); + + public static void setRecordReaderParms(HBaseRecordReaderBase trr, HBaseTableSplitBase tSplit) throws IOException { + switch (tSplit.getSourceMode()) { + case SCAN_ALL: + case SCAN_RANGE: { + LOG.debug(String.format( + "For split [%s] we have start key (%s) and stop key (%s)", + tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setEndRowInclusive(tSplit.getEndRowInclusive()); + trr.setUseSalt(tSplit.getUseSalt()); + } + + break; + + case GET_LIST: { + LOG.debug(String.format("For split [%s] we have key list (%s)", + tSplit, tSplit.getKeyList())); + + trr.setKeyList(tSplit.getKeyList()); + trr.setVersions(tSplit.getVersions()); + trr.setUseSalt(tSplit.getUseSalt()); + } + + break; + + default: + throw new IOException("Unknown source mode : " + + tSplit.getSourceMode()); + } + + trr.setSourceMode(tSplit.getSourceMode()); + } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 25b89cb..5b5e9c3 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -3,21 +3,26 @@ package parallelai.spyglass.hbase; import org.apache.hadoop.conf.Configuration; public class HBaseConstants { - - public enum SourceMode { - EMPTY, - SCAN_ALL, - SCAN_RANGE, - GET_LIST; - } - public static final String START_KEY = "hbase.%s.startkey"; - public static final String STOP_KEY = "hbase.%s.stopkey"; - public static final String SOURCE_MODE = "hbase.%s.source.mode"; - public static final String KEY_LIST = "hbase.%s.key.list"; - public static final String VERSIONS = "hbase.%s.versions"; - public static final String USE_SALT = "hbase.%s.use.salt"; - public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + public enum SourceMode { + EMPTY, + SCAN_ALL, + SCAN_RANGE, + GET_LIST; + } - public static final String SINK_MODE = "hbase.%s.sink.mode"; -} + public enum SplitType { + GRANULAR, + REGIONAL; + } + + public static final String START_KEY = "hbase.%s.startkey"; + public static final String STOP_KEY = "hbase.%s.stopkey"; + public static final String SOURCE_MODE = "hbase.%s.source.mode"; + public static final String KEY_LIST = "hbase.%s.key.list"; + public static final String VERSIONS = "hbase.%s.versions"; + public static final String USE_SALT = "hbase.%s.use.salt"; + public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + + public static final String SINK_MODE = "hbase.%s.sink.mode"; +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java deleted file mode 100644 index 5a1184f..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ /dev/null @@ -1,645 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import javax.naming.NamingException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseInputFormat implements - InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { - - private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); - - private final String id = UUID.randomUUID().toString(); - - private byte[][] inputColumns; - private HTable table; - // private HBaseRecordReader tableRecordReader; - private Filter rowFilter; - // private String tableName = ""; - - private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); - - private String nameServer = null; - - // private Scan scan = null; - - private HBaseMultiInputSplit[] convertToMultiSplitArray( - List<HBaseTableSplit> splits) throws IOException { - - if (splits == null) - throw new IOException("The list of splits is null => " + splits); - - HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>(); - - for (HBaseTableSplit hbt : splits) { - HBaseMultiInputSplit mis = null; - if (regionSplits.containsKey(hbt.getRegionLocation())) { - mis = regionSplits.get(hbt.getRegionLocation()); - } else { - regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit( - hbt.getRegionLocation())); - mis = regionSplits.get(hbt.getRegionLocation()); - } - - mis.addSplit(hbt); - regionSplits.put(hbt.getRegionLocation(), mis); - } - - Collection<HBaseMultiInputSplit> outVals = regionSplits.values(); - - LOG.debug("".format("Returning array of splits : %s", outVals)); - - if (outVals == null) - throw new IOException("The list of multi input splits were null"); - - return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]); - } - - @SuppressWarnings("deprecation") - @Override - public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (this.table == null) { - throw new IOException("No table was provided"); - } - - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); - } - - final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); - - if (keys == null || keys.getFirst() == null - || keys.getFirst().length == 0) { - HRegionLocation regLoc = table.getRegionLocation( - HConstants.EMPTY_BYTE_ARRAY, false); - - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - - final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split( - Addressing.HOSTNAME_PORT_SEPARATOR)[0], - SourceMode.EMPTY, false); - - splits.add(split); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - if (keys.getSecond() == null || keys.getSecond().length == 0) { - throw new IOException("Expecting at least one region."); - } - - if (keys.getFirst().length != keys.getSecond().length) { - throw new IOException("Regions for start and end key do not match"); - } - - byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; - byte[] maxKey = keys.getSecond()[0]; - - LOG.debug(String.format("SETTING min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - byte[][] regStartKeys = keys.getFirst(); - byte[][] regStopKeys = keys.getSecond(); - String[] regions = new String[regStartKeys.length]; - - for (int i = 0; i < regStartKeys.length; i++) { - minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) - && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] - : minKey; - maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) - && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] - : maxKey; - - HServerAddress regionServerAddress = table.getRegionLocation( - keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = regionServerAddress.getInetSocketAddress() - .getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress - + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - // HServerAddress regionServerAddress = - // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - // InetAddress regionAddress = - // regionServerAddress.getInetSocketAddress().getAddress(); - // - // String regionLocation; - // - // try { - // regionLocation = reverseDNS(regionAddress); - // } catch (NamingException e) { - // LOG.error("Cannot resolve the host name for " + regionAddress + - // " because of " + e); - // regionLocation = regionServerAddress.getHostname(); - // } - - // String regionLocation = - // table.getRegionLocation(keys.getFirst()[i]).getHostname(); - - LOG.debug("***** " + regionLocation); - - if (regionLocation == null || regionLocation.length() == 0) - throw new IOException("The region info for regiosn " + i - + " is null or empty"); - - regions[i] = regionLocation; - - LOG.debug(String.format( - "Region (%s) has start key (%s) and stop key (%s)", regions[i], - Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); - } - - byte[] startRow = HConstants.EMPTY_START_ROW; - byte[] stopRow = HConstants.EMPTY_END_ROW; - - LOG.debug(String.format("Found min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - LOG.debug("SOURCE MODE is : " + sourceMode); - - switch (sourceMode) { - case SCAN_ALL: - startRow = HConstants.EMPTY_START_ROW; - stopRow = HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN ALL: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - - case SCAN_RANGE: - startRow = (startKey != null && startKey.length() != 0) ? Bytes - .toBytes(startKey) : HConstants.EMPTY_START_ROW; - stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes - .toBytes(stopKey) : HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN RANGE: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - } - - switch (sourceMode) { - case EMPTY: - case SCAN_ALL: - case SCAN_RANGE: { - // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : - // startRow; - // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : - // stopRow; - - final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - if (!useSalt) { - - List<HRegionLocation> validRegions = table.getRegionsInRange( - startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for (HRegionLocation cRegion : validRegions) { - byte[] rStart = cRegion.getRegionInfo().getStartKey(); - byte[] rStop = cRegion.getRegionInfo().getEndKey(); - - HServerAddress regionServerAddress = cRegion - .getServerAddress(); - InetAddress regionAddress = regionServerAddress - .getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " - + regionAddress + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte[] sStart = (startRow == HConstants.EMPTY_START_ROW - || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart - : startRow); - byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW - || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop - : stopRow); - - LOG.debug(String.format( - "BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes - .compareTo(startRow, rStart) <= 0)), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes - .compareTo(stopRow, rStop) >= 0)), rStop.length)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), sStart, sStop, regionLocation, - SourceMode.SCAN_RANGE, useSalt); - - split.setEndRowInclusive(currentRegion == maxRegions); - - currentRegion++; - - LOG.debug(String - .format( - "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), - Bytes.toString(stopRow), Bytes.toString(rStart), - Bytes.toString(rStop), Bytes.toString(sStart), - Bytes.toString(sStop), cRegion.getHostnamePort(), - split)); - - splits.add(split); - } - } else { - LOG.debug("Using SALT : " + useSalt); - - // Will return the start and the stop key with all possible - // prefixes. - for (int i = 0; i < regions.length; i++) { - Pair<byte[], byte[]>[] intervals = HBaseSalter - .getDistributedIntervals(startRow, stopRow, - regStartKeys[i], regStopKeys[i], prefixList); - - for (Pair<byte[], byte[]> pair : intervals) { - LOG.debug("".format( - "Using SALT, Region (%s) Start (%s) Stop (%s)", - regions[i], Bytes.toString(pair.getFirst()), - Bytes.toString(pair.getSecond()))); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), pair.getFirst(), - pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, - useSalt); - - split.setEndRowInclusive(true); - splits.add(split); - } - } - } - - LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - case GET_LIST: { - // if( keyList == null || keyList.size() == 0 ) { - if (keyList == null) { - throw new IOException( - "Source Mode is GET_LIST but key list is EMPTY"); - } - - if (useSalt) { - TreeSet<String> tempKeyList = new TreeSet<String>(); - - for (String key : keyList) { - tempKeyList.add(HBaseSalter.addSaltPrefix(key)); - } - - keyList = tempKeyList; - } - - LOG.info("".format("Splitting Key List (%s)", keyList)); - - final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - for (int i = 0; i < keys.getFirst().length; i++) { - - if (!includeRegionInSplit(keys.getFirst()[i], - keys.getSecond()[i])) { - continue; - } - - LOG.debug(String.format( - "Getting region (%s) subset (%s) to (%s)", regions[i], - Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - - Set<String> regionsSubSet = null; - - if ((regStartKeys[i] == null || regStartKeys[i].length == 0) - && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { - LOG.debug("REGION start is empty"); - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList; - } else if (regStartKeys[i] == null - || regStartKeys[i].length == 0) { - LOG.debug("REGION start is empty"); - regionsSubSet = keyList.headSet( - Bytes.toString(regStopKeys[i]), true); - } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList.tailSet( - Bytes.toString(regStartKeys[i]), true); - } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { - regionsSubSet = keyList.subSet( - Bytes.toString(regStartKeys[i]), true, - Bytes.toString(regStopKeys[i]), true); - } else { - throw new IOException(String.format( - "For REGION (%s) Start Key (%s) > Stop Key(%s)", - regions[i], Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - } - - if (regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.debug("EMPTY: Key is for region " + regions[i] - + " is null"); - - continue; - } - - TreeSet<String> regionKeyList = new TreeSet<String>( - regionsSubSet); - - LOG.debug(String.format("Regions [%s] has key list <%s>", - regions[i], regionKeyList)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, versions, regions[i], - SourceMode.GET_LIST, useSalt); - splits.add(split); - } - - // if (splits.isEmpty()) { - // LOG.info("GOT EMPTY SPLITS"); - - // throw new IOException( - // "".format("Key List NOT found in any region")); - - // HRegionLocation regLoc = table.getRegionLocation( - // HConstants.EMPTY_BYTE_ARRAY, false); - // - // if (null == regLoc) { - // throw new IOException("Expecting at least one region."); - // } - // - // HBaseTableSplit split = new HBaseTableSplit( - // table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, - // HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort() - // .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], - // SourceMode.EMPTY, false); - // - // splits.add(split); - // } - - LOG.info("RETURNED SPLITS: split -> " + splits); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - default: - throw new IOException("Unknown source Mode : " + sourceMode); - } - } - - private String reverseDNS(InetAddress ipAddress) throws NamingException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( - ipAddress, this.nameServer)); - this.reverseDNSCacheMap.put(ipAddress, hostName); - } - return hostName; - } - - @Override - public RecordReader<ImmutableBytesWritable, Result> getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - if (!(split instanceof HBaseMultiInputSplit)) - throw new IOException("Table Split is not type HBaseMultiInputSplit"); - - HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split; - - HBaseRecordReader trr = new HBaseRecordReader(tSplit); - - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - trr.setUseSalt(useSalt); - - trr.setNextSplit(); - - return trr; - } - - /* Configuration Section */ - - /** - * space delimited list of columns - */ - public static final String COLUMN_LIST = "hbase.tablecolumns"; - - /** - * Use this jobconf param to specify the input table - */ - private static final String INPUT_TABLE = "hbase.inputtable"; - - private String startKey = null; - private String stopKey = null; - - private SourceMode sourceMode = SourceMode.EMPTY; - private TreeSet<String> keyList = null; - private int versions = 1; - private boolean useSalt = false; - private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - public void configure(JobConf job) { - String tableName = getTableName(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte[][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - - try { - setHTable(new HTable(HBaseConfiguration.create(job), tableName)); - } catch (Exception e) { - LOG.error("************* Table could not be created"); - LOG.error(StringUtils.stringifyException(e)); - } - - LOG.debug("Entered : " + this.getClass() + " : configure()"); - - useSalt = job.getBoolean( - String.format(HBaseConstants.USE_SALT, getTableName(job)), false); - prefixList = job.get( - String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), - HBaseSalter.DEFAULT_PREFIX_LIST); - - sourceMode = SourceMode.valueOf(job.get(String.format( - HBaseConstants.SOURCE_MODE, getTableName(job)))); - - LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String - .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job - .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), - sourceMode)); - - switch (sourceMode) { - case SCAN_RANGE: - LOG.info("HIT SCAN_RANGE"); - - startKey = getJobProp(job, - String.format(HBaseConstants.START_KEY, getTableName(job))); - stopKey = getJobProp(job, - String.format(HBaseConstants.STOP_KEY, getTableName(job))); - - LOG.info(String.format("Setting start key (%s) and stop key (%s)", - startKey, stopKey)); - break; - - case GET_LIST: - LOG.info("HIT GET_LIST"); - - Collection<String> keys = job.getStringCollection(String.format( - HBaseConstants.KEY_LIST, getTableName(job))); - keyList = new TreeSet<String>(keys); - - versions = job.getInt( - String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - - LOG.debug("GOT KEY LIST : " + keys); - LOG.debug(String.format("SETTING key list (%s)", keyList)); - - break; - - case EMPTY: - LOG.info("HIT EMPTY"); - - sourceMode = SourceMode.SCAN_ALL; - break; - - default: - LOG.info("HIT DEFAULT"); - - break; - } - } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - String tableName = getTableName(job); - - if (tableName == null) { - throw new IOException("expecting one table name"); - } - LOG.debug(String.format("Found Table name [%s]", tableName)); - - // connected to table? - if (getHTable() == null) { - throw new IOException("could not connect to table '" + tableName + "'"); - } - LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - LOG.debug(String.format("Found Columns [%s]", colArg)); - - LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, - stopKey)); - - if (sourceMode == SourceMode.EMPTY) { - throw new IOException("SourceMode should not be EMPTY"); - } - - if (sourceMode == SourceMode.GET_LIST - && (keyList == null || keyList.size() == 0)) { - throw new IOException("Source mode is GET_LIST bu key list is empty"); - } - } - - /* Getters & Setters */ - private HTable getHTable() { - return this.table; - } - - private void setHTable(HTable ht) { - this.table = ht; - } - - private void setInputColumns(byte[][] ic) { - this.inputColumns = ic; - } - - private void setJobProp(JobConf job, String key, String value) { - if (job.get(key) != null) - throw new RuntimeException(String.format( - "Job Conf already has key [%s] with value [%s]", key, - job.get(key))); - job.set(key, value); - } - - private String getJobProp(JobConf job, String key) { - return job.get(key); - } - - public static void setTableName(JobConf job, String tableName) { - // Make sure that table has not been set before - String oldTableName = getTableName(job); - if (oldTableName != null) { - throw new RuntimeException("table name already set to: '" - + oldTableName + "'"); - } - - job.set(INPUT_TABLE, tableName); - } - - public static String getTableName(JobConf job) { - return job.get(INPUT_TABLE); - } - - protected boolean includeRegionInSplit(final byte[] startKey, - final byte[] endKey) { - return true; - } -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java new file mode 100644 index 0000000..2f3047b --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -0,0 +1,172 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.util.StringUtils; + +import java.util.Collection; +import java.util.TreeSet; +import java.util.UUID; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:43 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + + private final Log LOG = LogFactory.getLog(HBaseInputFormatBase.class); + + protected final String id = UUID.randomUUID().toString(); + protected byte[][] inputColumns; + protected HTable table; + protected Filter rowFilter; + + public static final String COLUMN_LIST = "hbase.tablecolumns"; + + /** + * Use this jobconf param to specify the input table + */ + protected static final String INPUT_TABLE = "hbase.inputtable"; + + protected String startKey = null; + protected String stopKey = null; + + protected HBaseConstants.SourceMode sourceMode = HBaseConstants.SourceMode.EMPTY; + protected TreeSet<String> keyList = null; + protected int versions = 1; + protected boolean useSalt = false; + protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + + + + @Override + public void configure(JobConf job) { + String tableName = getTableName(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte[][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColumns(m_cols); + + try { + setHTable(new HTable(HBaseConfiguration.create(job), tableName)); + } catch (Exception e) { + LOG.error("************* Table could not be created"); + LOG.error(StringUtils.stringifyException(e)); + } + + LOG.debug("Entered : " + this.getClass() + " : configure()"); + + useSalt = job.getBoolean( + String.format(HBaseConstants.USE_SALT, getTableName(job)), false); + prefixList = job.get( + String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), + HBaseSalter.DEFAULT_PREFIX_LIST); + + sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format( + HBaseConstants.SOURCE_MODE, getTableName(job)))); + + LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String + .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job + .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), + sourceMode)); + + switch (sourceMode) { + case SCAN_RANGE: + LOG.debug("HIT SCAN_RANGE"); + + startKey = getJobProp(job, + String.format(HBaseConstants.START_KEY, getTableName(job))); + stopKey = getJobProp(job, + String.format(HBaseConstants.STOP_KEY, getTableName(job))); + + LOG.debug(String.format("Setting start key (%s) and stop key (%s)", + startKey, stopKey)); + break; + + case GET_LIST: + LOG.debug("HIT GET_LIST"); + + Collection<String> keys = job.getStringCollection(String.format( + HBaseConstants.KEY_LIST, getTableName(job))); + keyList = new TreeSet<String>(keys); + + versions = job.getInt( + String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + + LOG.debug("GOT KEY LIST : " + keys); + LOG.debug(String.format("SETTING key list (%s)", keyList)); + + break; + + case EMPTY: + LOG.info("HIT EMPTY"); + + sourceMode = HBaseConstants.SourceMode.SCAN_ALL; + break; + + default: + LOG.info("HIT DEFAULT"); + + break; + } + } + + /* Getters & Setters */ + protected HTable getHTable() { + return this.table; + } + + protected void setHTable(HTable ht) { + this.table = ht; + } + + protected void setInputColumns(byte[][] ic) { + this.inputColumns = ic; + } + + protected void setJobProp(JobConf job, String key, String value) { + if (job.get(key) != null) + throw new RuntimeException(String.format( + "Job Conf already has key [%s] with value [%s]", key, + job.get(key))); + job.set(key, value); + } + + protected String getJobProp(JobConf job, String key) { + return job.get(key); + } + + public static void setTableName(JobConf job, String tableName) { + // Make sure that table has not been set before + String oldTableName = getTableName(job); + if (oldTableName != null) { + throw new RuntimeException("table name already set to: '" + + oldTableName + "'"); + } + + job.set(INPUT_TABLE, tableName); + } + + public static String getTableName(JobConf job) { + return job.get(INPUT_TABLE); + } + + protected void setParms(HBaseRecordReaderBase trr) { + + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 96bfea1..929e9d8 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -37,17 +37,10 @@ import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseInputFormat_SINGLE implements - InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { +public class HBaseInputFormatGranular extends HBaseInputFormatBase { - private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class); + private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); - private final String id = UUID.randomUUID().toString(); - - private byte[][] inputColumns; - private HTable table; - // private HBaseRecordReader tableRecordReader; - private Filter rowFilter; // private String tableName = ""; private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); @@ -58,7 +51,7 @@ public class HBaseInputFormat_SINGLE implements @SuppressWarnings("deprecation") @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { if (this.table == null) { throw new IOException("No table was provided"); } @@ -78,8 +71,8 @@ public class HBaseInputFormat_SINGLE implements throw new IOException("Expecting at least one region."); } - List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); - HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), + List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>(1); + HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split( Addressing.HOSTNAME_PORT_SEPARATOR)[0], @@ -87,7 +80,7 @@ public class HBaseInputFormat_SINGLE implements splits.add(split); - return splits.toArray(new HBaseTableSplit[splits.size()]); + return splits.toArray(new HBaseTableSplitGranular[splits.size()]); } if (keys.getSecond() == null || keys.getSecond().length == 0) { @@ -173,7 +166,7 @@ public class HBaseInputFormat_SINGLE implements startRow = HConstants.EMPTY_START_ROW; stopRow = HConstants.EMPTY_END_ROW; - LOG.info(String.format( + LOG.debug(String.format( "SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; @@ -184,7 +177,7 @@ public class HBaseInputFormat_SINGLE implements stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes .toBytes(stopKey) : HConstants.EMPTY_END_ROW; - LOG.info(String.format( + LOG.debug(String.format( "SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); break; @@ -199,7 +192,7 @@ public class HBaseInputFormat_SINGLE implements // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : // stopRow; - List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>(); if (!useSalt) { @@ -240,7 +233,7 @@ public class HBaseInputFormat_SINGLE implements (stopRow == HConstants.EMPTY_END_ROW || (Bytes .compareTo(stopRow, rStop) >= 0)), rStop.length)); - HBaseTableSplit split = new HBaseTableSplit( + HBaseTableSplitGranular split = new HBaseTableSplitGranular( table.getTableName(), sStart, sStop, regionLocation, SourceMode.SCAN_RANGE, useSalt); @@ -270,12 +263,12 @@ public class HBaseInputFormat_SINGLE implements regStartKeys[i], regStopKeys[i], prefixList); for (Pair<byte[], byte[]> pair : intervals) { - LOG.info("".format( + LOG.debug("".format( "Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); - HBaseTableSplit split = new HBaseTableSplit( + HBaseTableSplitGranular split = new HBaseTableSplitGranular( table.getTableName(), pair.getFirst(), pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, useSalt); @@ -286,12 +279,12 @@ public class HBaseInputFormat_SINGLE implements } } - LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); - for (HBaseTableSplit s : splits) { - LOG.info("RETURNED SPLITS: split -> " + s); + LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); + for (HBaseTableSplitGranular s : splits) { + LOG.debug("RETURNED SPLITS: split -> " + s); } - return splits.toArray(new HBaseTableSplit[splits.size()]); + return splits.toArray(new HBaseTableSplitGranular[splits.size()]); } case GET_LIST: { @@ -313,7 +306,7 @@ public class HBaseInputFormat_SINGLE implements LOG.debug("".format("Splitting Key List (%s)", keyList)); - List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + List<HBaseTableSplitGranular> splits = new ArrayList<HBaseTableSplitGranular>(); for (int i = 0; i < keys.getFirst().length; i++) { @@ -325,7 +318,7 @@ public class HBaseInputFormat_SINGLE implements LOG.debug(String.format( "Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), - Bytes.toString(regStartKeys[i]))); + Bytes.toString(regStopKeys[i]))); Set<String> regionsSubSet = null; @@ -367,7 +360,7 @@ public class HBaseInputFormat_SINGLE implements LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList)); - HBaseTableSplit split = new HBaseTableSplit( + HBaseTableSplitGranular split = new HBaseTableSplitGranular( table.getTableName(), regionKeyList, versions, regions[i], SourceMode.GET_LIST, useSalt); splits.add(split); @@ -375,7 +368,7 @@ public class HBaseInputFormat_SINGLE implements LOG.debug("RETURNED SPLITS: split -> " + splits); - return splits.toArray(new HBaseTableSplit[splits.size()]); + return splits.toArray(new HBaseTableSplitGranular[splits.size()]); } default: @@ -397,45 +390,17 @@ public class HBaseInputFormat_SINGLE implements public RecordReader<ImmutableBytesWritable, Result> getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { - if (!(split instanceof HBaseTableSplit)) - throw new IOException("Table Split is not type HBaseTableSplit"); - - HBaseTableSplit tSplit = (HBaseTableSplit) split; - - HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE(); - - switch (tSplit.getSourceMode()) { - case SCAN_ALL: - case SCAN_RANGE: { - LOG.debug(String.format( - "For split [%s] we have start key (%s) and stop key (%s)", - tSplit, tSplit.getStartRow(), tSplit.getEndRow())); - - trr.setStartRow(tSplit.getStartRow()); - trr.setEndRow(tSplit.getEndRow()); - trr.setEndRowInclusive(tSplit.getEndRowInclusive()); - trr.setUseSalt(useSalt); - } + LOG.info("GRANULAR SPLIT -> " + split); - break; + if (!(split instanceof HBaseTableSplitGranular)) + throw new IOException("Table Split is not type HBaseTableSplitGranular"); - case GET_LIST: { - LOG.debug(String.format("For split [%s] we have key list (%s)", - tSplit, tSplit.getKeyList())); + HBaseTableSplitGranular tSplit = (HBaseTableSplitGranular) split; - trr.setKeyList(tSplit.getKeyList()); - trr.setVersions(tSplit.getVersions()); - trr.setUseSalt(useSalt); - } + HBaseRecordReaderGranular trr = new HBaseRecordReaderGranular(); - break; - - default: - throw new IOException("Unknown source mode : " - + tSplit.getSourceMode()); - } + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); - trr.setSourceMode(tSplit.getSourceMode()); trr.setHTable(this.table); trr.setInputColumns(this.inputColumns); trr.setRowFilter(this.rowFilter); @@ -450,95 +415,6 @@ public class HBaseInputFormat_SINGLE implements /** * space delimited list of columns */ - public static final String COLUMN_LIST = "hbase.tablecolumns"; - - /** - * Use this jobconf param to specify the input table - */ - private static final String INPUT_TABLE = "hbase.inputtable"; - - private String startKey = null; - private String stopKey = null; - - private SourceMode sourceMode = SourceMode.EMPTY; - private TreeSet<String> keyList = null; - private int versions = 1; - private boolean useSalt = false; - private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - public void configure(JobConf job) { - String tableName = getTableName(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte[][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - - try { - setHTable(new HTable(HBaseConfiguration.create(job), tableName)); - } catch (Exception e) { - LOG.error("************* Table could not be created"); - LOG.error(StringUtils.stringifyException(e)); - } - - LOG.debug("Entered : " + this.getClass() + " : configure()"); - - useSalt = job.getBoolean( - String.format(HBaseConstants.USE_SALT, getTableName(job)), false); - prefixList = job.get( - String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), - HBaseSalter.DEFAULT_PREFIX_LIST); - - sourceMode = SourceMode.valueOf(job.get(String.format( - HBaseConstants.SOURCE_MODE, getTableName(job)))); - - LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String - .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job - .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), - sourceMode)); - - switch (sourceMode) { - case SCAN_RANGE: - LOG.info("HIT SCAN_RANGE"); - - startKey = getJobProp(job, - String.format(HBaseConstants.START_KEY, getTableName(job))); - stopKey = getJobProp(job, - String.format(HBaseConstants.STOP_KEY, getTableName(job))); - - LOG.info(String.format("Setting start key (%s) and stop key (%s)", - startKey, stopKey)); - break; - - case GET_LIST: - LOG.info("HIT GET_LIST"); - - Collection<String> keys = job.getStringCollection(String.format( - HBaseConstants.KEY_LIST, getTableName(job))); - keyList = new TreeSet<String>(keys); - - versions = job.getInt( - String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - - LOG.debug("GOT KEY LIST : " + keys); - LOG.debug(String.format("SETTING key list (%s)", keyList)); - - break; - - case EMPTY: - LOG.info("HIT EMPTY"); - - sourceMode = SourceMode.SCAN_ALL; - break; - - default: - LOG.info("HIT DEFAULT"); - - break; - } - } public void validateInput(JobConf job) throws IOException { // expecting exactly one path @@ -575,45 +451,6 @@ public class HBaseInputFormat_SINGLE implements } } - /* Getters & Setters */ - private HTable getHTable() { - return this.table; - } - - private void setHTable(HTable ht) { - this.table = ht; - } - - private void setInputColumns(byte[][] ic) { - this.inputColumns = ic; - } - - private void setJobProp(JobConf job, String key, String value) { - if (job.get(key) != null) - throw new RuntimeException(String.format( - "Job Conf already has key [%s] with value [%s]", key, - job.get(key))); - job.set(key, value); - } - - private String getJobProp(JobConf job, String key) { - return job.get(key); - } - - public static void setTableName(JobConf job, String tableName) { - // Make sure that table has not been set before - String oldTableName = getTableName(job); - if (oldTableName != null) { - throw new RuntimeException("table name already set to: '" - + oldTableName + "'"); - } - - job.set(INPUT_TABLE, tableName); - } - - public static String getTableName(JobConf job) { - return job.get(INPUT_TABLE); - } protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java new file mode 100644 index 0000000..eadb57e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:24 + * To change this template use File | Settings | File Templates. + */ +public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); + private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); + + + @Override + public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { + granular.configure(job); + HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits); + + HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits ); + + if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL"); + + LOG.info("GRANULAR => " + gSplits); + LOG.info("REGIONAL => " + splits); + + return splits; + } + + @Override + public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + if (!(inputSplit instanceof HBaseTableSplitRegional)) + throw new IOException("Table Split is not type HBaseTableSplitRegional"); + + LOG.info("REGIONAL SPLIT -> " + inputSplit); + + HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit; + + HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional(); + + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + + trr.init(tSplit); + + return trr; + } + + private HBaseTableSplitRegional[] convertToMultiSplitArray( + HBaseTableSplitGranular[] splits) throws IOException { + + if (splits == null) + throw new IOException("The list of splits is null => " + splits); + + HashMap<String, HBaseTableSplitRegional> regionSplits = new HashMap<String, HBaseTableSplitRegional>(); + + for (HBaseTableSplitGranular hbt : splits) { + HBaseTableSplitRegional mis = null; + if (regionSplits.containsKey(hbt.getRegionLocation())) { + mis = regionSplits.get(hbt.getRegionLocation()); + } else { + regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional( + hbt.getRegionLocation())); + mis = regionSplits.get(hbt.getRegionLocation()); + } + + mis.addSplit(hbt); + regionSplits.put(hbt.getRegionLocation(), mis); + } + +// for(String region : regionSplits.keySet() ) { +// regionSplits.get(region) +// } + + Collection<HBaseTableSplitRegional> outVals = regionSplits.values(); + + LOG.debug("".format("Returning array of splits : %s", outVals)); + + if (outVals == null) + throw new IOException("The list of multi input splits were null"); + + return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]); + } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java deleted file mode 100644 index 02e7f7b..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java +++ /dev/null @@ -1,111 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang.SerializationUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -public class HBaseMultiInputSplit implements InputSplit, - Comparable<HBaseMultiInputSplit>, Serializable { - - private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class); - - private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - private String regionLocation = null; - - /** default constructor */ - private HBaseMultiInputSplit() { - - } - - public HBaseMultiInputSplit(String regionLocation) { - this.regionLocation = regionLocation; - } - - /** @return the region's hostname */ - public String getRegionLocation() { - LOG.debug("REGION GETTER : " + regionLocation); - - return this.regionLocation; - } - - @Override - public void readFields(DataInput in) throws IOException { - LOG.debug("READ ME : " + in.toString()); - - int s = Bytes.toInt(Bytes.readByteArray(in)); - - for (int i = 0; i < s; i++) { - HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils - .deserialize(Bytes.readByteArray(in)); - splits.add(hbts); - } - - LOG.debug("READ and CREATED : " + this); - } - - @Override - public void write(DataOutput out) throws IOException { - LOG.debug("WRITE : " + this); - - Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); - - for (HBaseTableSplit hbts : splits) { - Bytes.writeByteArray(out, SerializationUtils.serialize(hbts)); - } - - LOG.debug("WROTE : " + out.toString()); - } - - @Override - public String toString() { - StringBuffer str = new StringBuffer(); - str.append("HBaseMultiSplit : "); - - for (HBaseTableSplit hbt : splits) { - str.append(" [" + hbt.toString() + "]"); - } - - return str.toString(); - } - - @Override - public int compareTo(HBaseMultiInputSplit o) { - // TODO: Make this comparison better - return (splits.size() - o.splits.size()); - } - - @Override - public long getLength() throws IOException { - return splits.size(); - } - - @Override - public String[] getLocations() throws IOException { - LOG.debug("REGION ARRAY : " + regionLocation); - - return new String[] { this.regionLocation }; - } - - public void addSplit(HBaseTableSplit hbt) throws IOException { - if (hbt.getRegionLocation().equals(regionLocation)) - splits.add(hbt); - else - throw new IOException("HBaseTableSplit Region Location " - + hbt.getRegionLocation() - + " does NOT match MultiSplit Region Location " + regionLocation); - } - - public List<HBaseTableSplit> getSplits() { - return splits; - } -}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java deleted file mode 100644 index 5d7dbdd..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ /dev/null @@ -1,609 +0,0 @@ -package parallelai.spyglass.hbase; - -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.Vector; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements - RecordReader<ImmutableBytesWritable, Result> { - - static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - - private byte[] startRow; - private byte[] endRow; - private byte[] lastSuccessfulRow; - private TreeSet<String> keyList; - private SourceMode sourceMode; - private Filter trrRowFilter; - private ResultScanner scanner; - private HTable htable; - private byte[][] trrInputColumns; - private long timestamp; - private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - private boolean endRowInclusive = true; - private int versions = 1; - private boolean useSalt = false; - - private HBaseMultiInputSplit multiSplit = null; - private List<HBaseTableSplit> allSplits = null; - - private HBaseRecordReader() { - } - - public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException { - multiSplit = mSplit; - - LOG.info("Creatin Multi Split for region location : " - + multiSplit.getRegionLocation()); - - allSplits = multiSplit.getSplits(); - } - - public boolean setNextSplit() throws IOException { - if (allSplits.size() > 0) { - setSplitValue(allSplits.remove(0)); - return true; - } else { - return false; - } - } - - private void setSplitValue(HBaseTableSplit tSplit) throws IOException { - switch (tSplit.getSourceMode()) { - case SCAN_ALL: - case SCAN_RANGE: { - LOG.debug(String.format( - "For split [%s] we have start key (%s) and stop key (%s)", - tSplit, tSplit.getStartRow(), tSplit.getEndRow())); - - setStartRow(tSplit.getStartRow()); - setEndRow(tSplit.getEndRow()); - setEndRowInclusive(tSplit.getEndRowInclusive()); - } - - break; - - case GET_LIST: { - LOG.debug(String.format("For split [%s] we have key list (%s)", - tSplit, tSplit.getKeyList())); - - setKeyList(tSplit.getKeyList()); - setVersions(tSplit.getVersions()); - } - - break; - - case EMPTY: - LOG.info("EMPTY split. Doing nothing."); - break; - - default: - throw new IOException("Unknown source mode : " - + tSplit.getSourceMode()); - } - - setSourceMode(tSplit.getSourceMode()); - - init(); - } - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - private void restartRangeScan(byte[] firstRow) throws IOException { - Scan currentScan; - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) - + ", endRow: " + Bytes.toString(endRow)); - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - TableInputFormat.addColumns(scan, trrInputColumns); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) - + ", no endRow"); - - Scan scan = new Scan(firstRow); - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - if (logScannerActivity) { - LOG.debug("Current scan=" + currentScan.toString()); - timestamp = System.currentTimeMillis(); - rowcount = 0; - } - } - - public TreeSet<String> getKeyList() { - return keyList; - } - - private void setKeyList(TreeSet<String> keyList) { - this.keyList = keyList; - } - - private void setVersions(int versions) { - this.versions = versions; - } - - public void setUseSalt(boolean useSalt) { - this.useSalt = useSalt; - } - - public SourceMode getSourceMode() { - return sourceMode; - } - - private void setSourceMode(SourceMode sourceMode) { - this.sourceMode = sourceMode; - } - - public byte[] getEndRow() { - return endRow; - } - - private void setEndRowInclusive(boolean isInclusive) { - endRowInclusive = isInclusive; - } - - public boolean getEndRowInclusive() { - return endRowInclusive; - } - - private byte[] nextKey = null; - private Vector<List<KeyValue>> resultVector = null; - Map<Long, List<KeyValue>> keyValueMap = null; - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - private void init() throws IOException { - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - restartRangeScan(startRow); - break; - - case GET_LIST: - nextKey = Bytes.toBytes(keyList.pollFirst()); - break; - - case EMPTY: - LOG.info("EMPTY mode. Do nothing"); - break; - - default: - throw new IOException(" Unknown source mode : " + sourceMode); - } - } - - byte[] getStartRow() { - return this.startRow; - } - - /** - * @param htable - * the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean( - ScannerCallable.LOG_SCANNER_ACTIVITY, false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * @param inputColumns - * the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte[][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow - * the first row in the split - */ - private void setStartRow(final byte[] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow - * the last row in the split - */ - private void setEndRow(final byte[] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter - * the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - - @Override - public void close() { - if (this.scanner != null) - this.scanner.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - @Override - public Result createValue() { - return new Result(); - } - - @Override - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - @Override - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key - * HStoreKey as input key. - * @param value - * MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - @Override - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: { - - Result result; - try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) - + "ms to process " + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // try to handle all IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " - + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," - + " if your mapper has restarted a few other times like this" - + " then you should consider killing this job and investigate" - + " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restartRangeScan(startRow); - } else { - restartRangeScan(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row - } - result = this.scanner.next(); - } - - if (result != null && result.size() > 0) { - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return setNextSplit(); - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) - + "ms to process " + rowcount + " rows"); - LOG.debug(ioe); - String lastRow = lastSuccessfulRow == null ? "null" : Bytes - .toStringBinary(lastSuccessfulRow); - LOG.debug("lastSuccessfulRow=" + lastRow); - } - throw ioe; - } - } - - case GET_LIST: { - LOG.debug(String.format("INTO next with GET LIST and Key (%s)", - Bytes.toString(nextKey))); - - if (versions == 1) { - if (nextKey != null) { - LOG.debug(String.format("Processing Key (%s)", - Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result result = this.htable.get(theGet); - - if (result != null && (!result.isEmpty())) { - LOG.debug(String.format( - "Key (%s), Version (%s), Got Result (%s)", - Bytes.toString(nextKey), versions, result)); - - if (keyList != null || !keyList.isEmpty()) { - String newKey = keyList.pollFirst(); - LOG.debug("New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - } else { - nextKey = null; - } - - LOG.debug(String.format("=> Picked a new Key (%s)", - Bytes.toString(nextKey))); - - // Write the result - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - LOG.debug(" Key (" + Bytes.toString(nextKey) - + ") return an EMPTY result. Get (" + theGet.getId() - + ")"); // alg0 - - String newKey; - while ((newKey = keyList.pollFirst()) != null) { - LOG.debug("WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - if (nextKey == null) { - LOG.error("BOMB! BOMB! BOMB!"); - continue; - } - - if (!this.htable.exists(new Get(nextKey))) { - LOG.debug(String.format( - "Key (%s) Does not exist in Table (%s)", - Bytes.toString(nextKey), - Bytes.toString(this.htable.getTableName()))); - continue; - } else { - break; - } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - LOG.debug("Final New Key => " + Bytes.toString(nextKey)); - - return next(key, value); - } - } else { - // Nothig left. return false - return setNextSplit(); - } - } else { - if (resultVector != null && resultVector.size() != 0) { - LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", - versions, resultVector)); - - List<KeyValue> resultKeyValue = resultVector - .remove(resultVector.size() - 1); - Result result = new Result(resultKeyValue); - - LOG.debug(String.format("+ Version (%s), Got Result <%s>", - versions, result)); - - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - if (nextKey != null) { - LOG.debug(String.format("+ Processing Key (%s)", - Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result resultAll = this.htable.get(theGet); - - if (resultAll != null && (!resultAll.isEmpty())) { - List<KeyValue> keyValeList = resultAll.list(); - - keyValueMap = new HashMap<Long, List<KeyValue>>(); - - LOG.debug(String.format( - "+ Key (%s) Versions (%s) Val;ute map <%s>", - Bytes.toString(nextKey), versions, keyValueMap)); - - for (KeyValue keyValue : keyValeList) { - long version = keyValue.getTimestamp(); - - if (keyValueMap.containsKey(new Long(version))) { - List<KeyValue> keyValueTempList = keyValueMap - .get(new Long(version)); - if (keyValueTempList == null) { - keyValueTempList = new ArrayList<KeyValue>(); - } - keyValueTempList.add(keyValue); - } else { - List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); - keyValueMap.put(new Long(version), - keyValueTempList); - keyValueTempList.add(keyValue); - } - } - - resultVector = new Vector<List<KeyValue>>(); - resultVector.addAll(keyValueMap.values()); - - List<KeyValue> resultKeyValue = resultVector - .remove(resultVector.size() - 1); - - Result result = new Result(resultKeyValue); - - LOG.debug(String.format( - "+ Version (%s), Got Result (%s)", versions, - result)); - - String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - - System.out.println("+ New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } else { - LOG.debug(String.format( - "+ Key (%s) return an EMPTY result. Get (%s)", - Bytes.toString(nextKey), theGet.getId())); // alg0 - - String newKey; - - while ((newKey = keyList.pollFirst()) != null) { - LOG.debug("+ WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - if (nextKey == null) { - LOG.error("+ BOMB! BOMB! BOMB!"); - continue; - } - - if (!this.htable.exists(new Get(nextKey))) { - LOG.debug(String.format( - "+ Key (%s) Does not exist in Table (%s)", - Bytes.toString(nextKey), - Bytes.toString(this.htable.getTableName()))); - continue; - } else { - break; - } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - LOG.debug("+ Final New Key => " - + Bytes.toString(nextKey)); - - return next(key, value); - } - - } else { - return setNextSplit(); - } - } - } - } - - case EMPTY: { - LOG.info("GOT an empty Split"); - return setNextSplit(); - } - - default: - throw new IOException("Unknown source mode : " + sourceMode); - } - } -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java new file mode 100644 index 0000000..37858ad --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -0,0 +1,140 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.RecordReader; + +import java.util.TreeSet; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 15:42 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseRecordReaderBase implements + RecordReader<ImmutableBytesWritable, Result> { + + protected TreeSet<String> keyList; + protected HBaseConstants.SourceMode sourceMode; + protected boolean endRowInclusive = true; + protected int versions = 1; + protected boolean useSalt = false; + + protected byte[] startRow; + protected byte[] endRow; + + protected HTable htable; + protected byte[][] trrInputColumns; + + protected Filter trrRowFilter; + + protected boolean logScannerActivity = false; + protected int logPerRowCount = 100; + + @Override + public String toString() { + StringBuffer sbuf = new StringBuffer(); + + sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] endRowInc [%s] ", + Bytes.toString(startRow), Bytes.toString(endRow), endRowInclusive)); + sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ", + sourceMode, useSalt, versions)); + + return sbuf.toString(); + } + + byte[] getStartRow() { + return this.startRow; + } + + /** + * @param htable + * the {@link org.apache.hadoop.hbase.client.HTable} to scan. + */ + public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, + false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * @param inputColumns + * the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte[][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow + * the first row in the split + */ + public void setStartRow(final byte[] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow + * the last row in the split + */ + public void setEndRow(final byte[] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter + * the {@link org.apache.hadoop.hbase.filter.Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + public TreeSet<String> getKeyList() { + return keyList; + } + + public void setKeyList(TreeSet<String> keyList) { + this.keyList = keyList; + } + + public void setVersions(int versions) { + this.versions = versions; + } + + public void setUseSalt(boolean useSalt) { + this.useSalt = useSalt; + } + + public HBaseConstants.SourceMode getSourceMode() { + return sourceMode; + } + + public void setSourceMode(HBaseConstants.SourceMode sourceMode) { + this.sourceMode = sourceMode; + } + + public byte[] getEndRow() { + return endRow; + } + + public void setEndRowInclusive(boolean isInclusive) { + endRowInclusive = isInclusive; + } + + public boolean getEndRowInclusive() { + return endRowInclusive; + } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 5eafc78..6c28d9f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -30,29 +30,29 @@ import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseRecordReader_SINGLE implements - RecordReader<ImmutableBytesWritable, Result> { +public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { - static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class); + static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); - private byte[] startRow; - private byte[] endRow; private byte[] lastSuccessfulRow; - private TreeSet<String> keyList; - private SourceMode sourceMode; - private Filter trrRowFilter; private ResultScanner scanner; - private HTable htable; - private byte[][] trrInputColumns; private long timestamp; private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - private boolean endRowInclusive = true; - private int versions = 1; - private boolean useSalt = false; - /** + @Override + public String toString() { + StringBuffer sbuf = new StringBuffer(); + + sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", + Bytes.toString(startRow), Bytes.toString(endRow), Bytes.toString(lastSuccessfulRow), Bytes.toString(nextKey), endRowInclusive, rowcount)); + sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ", + sourceMode, useSalt, versions)); + + return sbuf.toString(); + } + + + /** * Restart from survivable exceptions by creating a new scanner. * * @param firstRow @@ -96,41 +96,6 @@ public class HBaseRecordReader_SINGLE implements } } - public TreeSet<String> getKeyList() { - return keyList; - } - - public void setKeyList(TreeSet<String> keyList) { - this.keyList = keyList; - } - - public void setVersions(int versions) { - this.versions = versions; - } - - public void setUseSalt(boolean useSalt) { - this.useSalt = useSalt; - } - - public SourceMode getSourceMode() { - return sourceMode; - } - - public void setSourceMode(SourceMode sourceMode) { - this.sourceMode = sourceMode; - } - - public byte[] getEndRow() { - return endRow; - } - - public void setEndRowInclusive(boolean isInclusive) { - endRowInclusive = isInclusive; - } - - public boolean getEndRowInclusive() { - return endRowInclusive; - } private byte[] nextKey = null; private Vector<List<KeyValue>> resultVector = null; @@ -157,55 +122,6 @@ public class HBaseRecordReader_SINGLE implements } } - byte[] getStartRow() { - return this.startRow; - } - - /** - * @param htable - * the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, - false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * @param inputColumns - * the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte[][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow - * the first row in the split - */ - public void setStartRow(final byte[] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow - * the last row in the split - */ - public void setEndRow(final byte[] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter - * the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - @Override public void close() { if (this.scanner != null) @@ -450,7 +366,6 @@ public class HBaseRecordReader_SINGLE implements String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - System.out.println("+ New Key => " + newKey); nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes .toBytes(newKey); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java new file mode 100644 index 0000000..e2b1ec8 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -0,0 +1,124 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { + + static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class); + + + private byte[] nextKey = null; + private Vector<List<KeyValue>> resultVector = null; + Map<Long, List<KeyValue>> keyValueMap = null; + + private HBaseTableSplitRegional multiSplit = null; + private HBaseTableSplitGranular currentSplit = null; + + private HBaseRecordReaderGranular currentRecordReader = null; + + public void init(HBaseTableSplitRegional mSplit) throws IOException { + multiSplit = mSplit; + + LOG.debug("Creating Multi Split for region location : " + + multiSplit.getRegionLocation() + " -> " + multiSplit); + + setNextSplit(); + } + + public boolean setNextSplit() throws IOException { + currentSplit = multiSplit.getNextSplit(); + + LOG.debug("IN: setNextSplit : " + currentSplit ); + + if( currentSplit != null ) { + setSplitValue(currentSplit); + return true; + } else { + return false; + } + } + + private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException { + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(htable); + trr.setInputColumns(trrInputColumns); + trr.setRowFilter(trrRowFilter); + + trr.init(); + } + + private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException { + LOG.debug("IN: setSplitValue : " + tSplit ); + + if( currentRecordReader != null ) currentRecordReader.close(); + + currentRecordReader = new HBaseRecordReaderGranular(); + setRecordReaderParms(currentRecordReader, currentSplit); + } + + @Override + public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException { + boolean nextFlag = currentRecordReader.next(ibw, result); + + while(nextFlag == false && multiSplit.hasMoreSplits() ) { + setNextSplit(); + nextFlag = currentRecordReader.next(ibw, result); + } + + return nextFlag; + } + + @Override + public ImmutableBytesWritable createKey() { + return currentRecordReader.createKey(); + } + + @Override + public Result createValue() { + return currentRecordReader.createValue(); + } + + @Override + public long getPos() throws IOException { + return currentRecordReader.getPos(); + } + + @Override + public void close() throws IOException { + currentRecordReader.close(); + } + + @Override + public float getProgress() throws IOException { + return currentRecordReader.getProgress(); + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 6766458..5bdf8cd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -68,7 +68,7 @@ public class HBaseSalter { byte[] originalStartKey, byte[] originalStopKey, byte[] regionStartKey, byte[] regionStopKey, String prefixList) throws IOException { - LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", + LOG.debug("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", Bytes.toString(originalStartKey), Bytes.toString(originalStopKey), Bytes.toString(regionStartKey), @@ -160,7 +160,7 @@ public class HBaseSalter { } private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { - LOG.info("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)", + LOG.debug("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)", Bytes.toString(originalKey), prefixList, startPrefix, stopPrefix)); char[] prefixArray = prefixList.toCharArray(); @@ -172,13 +172,13 @@ public class HBaseSalter { SortedSet<Byte> subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); - LOG.info("".format("Prefix subset (%s)", subSet)); + LOG.debug("".format("Prefix subset (%s)", subSet)); return getAllKeys(originalKey, subSet.toArray(new Byte[]{})); } public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { - LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)", + LOG.debug("".format("getAllKeys: OKEY (%s) PARRAY (%s)", Bytes.toString(originalKey), prefixArray )); byte[][] keys = new byte[prefixArray.length][]; @@ -187,12 +187,6 @@ public class HBaseSalter { keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey)); } - for(int i = 0; i < keys.length; i ++) { - for(int j = 0; j < keys[i].length; j++) { - LOG.info("" + i + " : " + j + " : " + keys[i][j]); - } - } - return keys; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index aa446c1..6f04f01 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -12,6 +12,7 @@ package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; import cascading.flow.FlowProcess; import cascading.scheme.Scheme; import cascading.scheme.SinkCall; @@ -65,6 +66,8 @@ public class HBaseScheme // private transient byte[][] fields; private boolean useSalt = false; + + private SplitType splitType = SplitType.GRANULAR; /** @@ -279,11 +282,31 @@ public class HBaseScheme @Override public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { - conf.setInputFormat(HBaseInputFormat.class); - String columns = getColumns(); - LOG.debug("sourcing from columns: {}", columns); - conf.set(HBaseInputFormat.COLUMN_LIST, columns); + switch(splitType) { + case GRANULAR: + { + conf.setInputFormat(HBaseInputFormatGranular.class); + + String columns = getColumns(); + LOG.debug("sourcing from columns: {}", columns); + conf.set(HBaseInputFormatGranular.COLUMN_LIST, columns); + } + break; + + case REGIONAL: + { + conf.setInputFormat(HBaseInputFormatRegional.class); + + String columns = getColumns(); + LOG.debug("sourcing from columns: {}", columns); + conf.set(HBaseInputFormatRegional.COLUMN_LIST, columns); + } + break; + + default: + LOG.error("Unknown Split Type : " + splitType.toString()); + } } private String getColumns() { @@ -345,4 +368,8 @@ public class HBaseScheme result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0); return result; } + + public void setInputSplitTye(SplitType sType) { + this.splitType = sType; + } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java deleted file mode 100644 index 87b8f58..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ /dev/null @@ -1,219 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseTableSplit implements InputSplit, - Comparable<HBaseTableSplit>, Serializable { - - private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); - - private byte[] m_tableName = null; - private byte[] m_startRow = null; - private byte[] m_endRow = null; - private String m_regionLocation = null; - private TreeSet<String> m_keyList = null; - private SourceMode m_sourceMode = SourceMode.EMPTY; - private boolean m_endRowInclusive = true; - private int m_versions = 1; - private boolean m_useSalt = false; - - /** default constructor */ - public HBaseTableSplit() { - this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); - } - - /** - * Constructor - * - * @param tableName - * @param startRow - * @param endRow - * @param location - */ - public HBaseTableSplit(final byte[] tableName, final byte[] startRow, - final byte[] endRow, final String location, - final SourceMode sourceMode, final boolean useSalt) { - this.m_tableName = tableName; - this.m_startRow = startRow; - this.m_endRow = endRow; - this.m_regionLocation = location; - this.m_sourceMode = sourceMode; - this.m_useSalt = useSalt; - } - - public HBaseTableSplit(final byte[] tableName, - final TreeSet<String> keyList, int versions, final String location, - final SourceMode sourceMode, final boolean useSalt) { - this.m_tableName = tableName; - this.m_keyList = keyList; - this.m_versions = versions; - this.m_sourceMode = sourceMode; - this.m_regionLocation = location; - this.m_useSalt = useSalt; - } - - /** @return table name */ - public byte[] getTableName() { - return this.m_tableName; - } - - /** @return starting row key */ - public byte[] getStartRow() { - return this.m_startRow; - } - - /** @return end row key */ - public byte[] getEndRow() { - return this.m_endRow; - } - - public boolean getEndRowInclusive() { - return m_endRowInclusive; - } - - public void setEndRowInclusive(boolean isInclusive) { - m_endRowInclusive = isInclusive; - } - - /** @return list of keys to get */ - public TreeSet<String> getKeyList() { - return m_keyList; - } - - public int getVersions() { - return m_versions; - } - - /** @return get the source mode */ - public SourceMode getSourceMode() { - return m_sourceMode; - } - - public boolean getUseSalt() { - return m_useSalt; - } - - /** @return the region's hostname */ - public String getRegionLocation() { - LOG.debug("REGION GETTER : " + m_regionLocation); - - return this.m_regionLocation; - } - - public String[] getLocations() { - LOG.debug("REGION ARRAY : " + m_regionLocation); - - return new String[] { this.m_regionLocation }; - } - - @Override - public long getLength() { - // Not clear how to obtain this... seems to be used only for sorting - // splits - return 0; - } - - @Override - public void readFields(DataInput in) throws IOException { - LOG.debug("READ ME : " + in.toString()); - - this.m_tableName = Bytes.readByteArray(in); - this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); - this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes - .readByteArray(in))); - this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); - - switch (this.m_sourceMode) { - case SCAN_RANGE: - this.m_startRow = Bytes.readByteArray(in); - this.m_endRow = Bytes.readByteArray(in); - this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); - break; - - case GET_LIST: - this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); - this.m_keyList = new TreeSet<String>(); - - int m = Bytes.toInt(Bytes.readByteArray(in)); - - for (int i = 0; i < m; i++) { - this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); - } - break; - } - - LOG.debug("READ and CREATED : " + this); - } - - @Override - public void write(DataOutput out) throws IOException { - LOG.debug("WRITE : " + this); - - Bytes.writeByteArray(out, this.m_tableName); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); - - switch (this.m_sourceMode) { - case SCAN_RANGE: - Bytes.writeByteArray(out, this.m_startRow); - Bytes.writeByteArray(out, this.m_endRow); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); - break; - - case GET_LIST: - Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); - - for (String k : this.m_keyList) { - Bytes.writeByteArray(out, Bytes.toBytes(k)); - } - break; - } - - LOG.debug("WROTE : " + out.toString()); - } - - @Override - public String toString() { - return String - .format( - "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", - Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, - Bytes.toString(m_startRow), Bytes.toString(m_endRow), - (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, - m_useSalt); - } - - @Override - public int compareTo(HBaseTableSplit o) { - switch (m_sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - return Bytes.compareTo(getStartRow(), o.getStartRow()); - - case GET_LIST: - return m_keyList.equals(o.getKeyList()) ? 0 : -1; - - case EMPTY: - return 0; - - default: - return -1; - } - - } -}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java new file mode 100644 index 0000000..2f6e7b5 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -0,0 +1,165 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.TreeSet; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 16:18 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseTableSplitBase implements InputSplit, + Comparable<HBaseTableSplitBase>, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseTableSplitBase.class); + + + protected byte[] m_tableName = null; + protected byte[] m_startRow = null; + protected byte[] m_endRow = null; + protected String m_regionLocation = null; + protected TreeSet<String> m_keyList = null; + protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY; + protected boolean m_endRowInclusive = true; + protected int m_versions = 1; + protected boolean m_useSalt = false; + + + /** @return table name */ + public byte[] getTableName() { + return this.m_tableName; + } + + /** @return starting row key */ + public byte[] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte[] getEndRow() { + return this.m_endRow; + } + + public boolean getEndRowInclusive() { + return m_endRowInclusive; + } + + public void setEndRowInclusive(boolean isInclusive) { + m_endRowInclusive = isInclusive; + } + + /** @return list of keys to get */ + public TreeSet<String> getKeyList() { + return m_keyList; + } + + public int getVersions() { + return m_versions; + } + + /** @return get the source mode */ + public HBaseConstants.SourceMode getSourceMode() { + return m_sourceMode; + } + + public boolean getUseSalt() { + return m_useSalt; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.debug("REGION GETTER : " + m_regionLocation); + + return this.m_regionLocation; + } + + public String[] getLocations() { + LOG.debug("REGION ARRAY : " + m_regionLocation); + + return new String[] { this.m_regionLocation }; + } + + + public void copy(HBaseTableSplitBase that) { + this.m_endRow = that.m_endRow; + this.m_endRowInclusive = that.m_endRowInclusive; + this.m_keyList = that.m_keyList; + this.m_sourceMode = that.m_sourceMode; + this.m_startRow = that.m_startRow; + this.m_tableName = that.m_tableName; + this.m_useSalt = that.m_useSalt; + this.m_versions = that.m_versions; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("READ ME : " + in.toString()); + + this.m_tableName = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes + .readByteArray(in))); + this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); + + switch (this.m_sourceMode) { + case SCAN_RANGE: + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); + break; + + case GET_LIST: + this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); + this.m_keyList = new TreeSet<String>(); + + int m = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < m; i++) { + this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); + } + break; + } + + LOG.debug("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("WRITE : " + this); + + Bytes.writeByteArray(out, this.m_tableName); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); + + switch (this.m_sourceMode) { + case SCAN_RANGE: + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); + break; + + case GET_LIST: + Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + + for (String k : this.m_keyList) { + Bytes.writeByteArray(out, Bytes.toBytes(k)); + } + break; + } + + LOG.debug("WROTE : " + out.toString()); + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java new file mode 100644 index 0000000..4de7153 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -0,0 +1,97 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseTableSplitGranular extends HBaseTableSplitBase { + + private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); + + /** default constructor */ + public HBaseTableSplitGranular() { + this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false); + } + + /** + * Constructor + * + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow, + final byte[] endRow, final String location, + final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + this.m_sourceMode = sourceMode; + this.m_useSalt = useSalt; + } + + public HBaseTableSplitGranular(final byte[] tableName, + final TreeSet<String> keyList, int versions, final String location, + final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { + this.m_tableName = tableName; + this.m_keyList = keyList; + this.m_versions = versions; + this.m_sourceMode = sourceMode; + this.m_regionLocation = location; + this.m_useSalt = useSalt; + } + + + @Override + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting + // splits + return 0; + } + + + @Override + public String toString() { + return String + .format( + "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, + Bytes.toString(m_startRow), Bytes.toString(m_endRow), + (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, + m_useSalt); + } + + @Override + public int compareTo(HBaseTableSplitBase o) { + if( ! (o instanceof HBaseTableSplitGranular) ) return -1; + + switch (m_sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + return Bytes.compareTo(getStartRow(), o.getStartRow()); + + case GET_LIST: + return m_keyList.equals(o.getKeyList()) ? 0 : -1; + + case EMPTY: + return 0; + + default: + return -1; + } + + } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java new file mode 100644 index 0000000..1ebfa3d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -0,0 +1,127 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Vector; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +public class HBaseTableSplitRegional extends HBaseTableSplitBase { + + private final Log LOG = LogFactory.getLog(HBaseTableSplitRegional.class); + + private List<HBaseTableSplitGranular> splits = new Vector<HBaseTableSplitGranular>(); + + /** default constructor */ + private HBaseTableSplitRegional() { + + } + + public HBaseTableSplitRegional(String regionLocation) { + this.m_regionLocation = regionLocation; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("REGIONAL READ ME : " + in.toString()); + + super.readFields(in); + + int s = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < s; i++) { + HBaseTableSplitGranular hbts = new HBaseTableSplitGranular(); + hbts.readFields(in); + + splits.add(hbts); + } + + LOG.debug("REGIONAL READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("REGIONAL WRITE : " + this); + + super.write(out); + + Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); + + for (HBaseTableSplitGranular hbts : splits) { + hbts.write(out); + } + + LOG.debug("REGIONAL WROTE : " + out.toString()); + } + + @Override + public String toString() { + StringBuffer str = new StringBuffer(); + str.append("HBaseTableSplitRegional : "); + + str.append(super.toString()); + + str.append(" GRANULAR = > "); + + for (HBaseTableSplitGranular hbt : splits) { + str.append(" [" + hbt.toString() + "]"); + } + + return str.toString(); + } + + @Override + public int compareTo(HBaseTableSplitBase o) { + if( ! (o instanceof HBaseTableSplitRegional) ) return -1; + + return (splits.size() - ((HBaseTableSplitRegional)o).splits.size()); + } + + @Override + public long getLength() throws IOException { + return splits.size(); + } + + public void addSplit(HBaseTableSplitGranular hbt) throws IOException { + LOG.debug("ADD Split : " + hbt); + + if (hbt.getRegionLocation().equals(m_regionLocation)) { + splits.add(hbt); + this.copy(hbt); + } else + throw new IOException("HBaseTableSplitGranular Region Location " + + hbt.getRegionLocation() + + " does NOT match MultiSplit Region Location " + m_regionLocation); + } + +// public List<HBaseTableSplitGranular> getSplits() { +// return splits; +// } + + public boolean hasMoreSplits() { + splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; + + return splitIterator.hasNext(); + } + + private Iterator<HBaseTableSplitGranular> splitIterator = null; + + public HBaseTableSplitGranular getNextSplit() { + splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; + + if( splitIterator.hasNext() ) { + return splitIterator.next(); + } else { + return null; + } + } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 07b5aa7..bfe6670 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,8 @@ package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; + import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.flow.FlowProcess; import cascading.tap.SinkMode; @@ -63,6 +65,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { /** Field tableName */ private String tableName; + private SplitType splitType = SplitType.GRANULAR; + /** * Constructor HBaseTap creates a new HBaseTap instance. * @@ -204,7 +208,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { return true; } - LOG.info("creating hbase table: {}", tableName); + LOG.info("Creating HBase Table: {}", tableName); HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); @@ -256,8 +260,19 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { // process.getID(); // // super.getFullIdentifier(conf); - - HBaseInputFormat.setTableName(conf, tableName); + + switch(splitType) { + case GRANULAR: + HBaseInputFormatGranular.setTableName(conf, tableName); + break; + + case REGIONAL: + HBaseInputFormatRegional.setTableName(conf, tableName); + break; + + default: + LOG.error("Unknown Split Type : " + splitType); + } for( SourceConfig sc : sourceConfigList) { sc.configure(conf); @@ -266,6 +281,10 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { super.sourceConfInit(process, conf); } + public void setInputSplitType(SplitType sType) { + this.splitType = sType; + } + @Override public boolean equals(Object object) { if (this == object) { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 7ff7860..c214e99 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -10,7 +10,7 @@ import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} import cascading.scheme.{NullScheme, Scheme} import cascading.tap.SinkMode import cascading.tap.Tap @@ -40,11 +40,14 @@ case class HBaseSource( versions: Int = 1, useSalt: Boolean = false, prefixList: String = null, - sinkMode: SinkMode = SinkMode.UPDATE + sinkMode: SinkMode = SinkMode.UPDATE, + inputSplitType: SplitType = SplitType.GRANULAR ) extends Source { - - override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) + internalScheme.setInputSplitTye(inputSplitType) + + override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) @@ -76,6 +79,8 @@ case class HBaseSource( } case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) } + + hbt.setInputSplitType(inputSplitType) hbt.asInstanceOf[Tap[_,_,_]] } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index a4e2d7a..d75ff7b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -1,9 +1,9 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} -import com.twitter.scalding.{IterableSource, Args, TextLine} +import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine} import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} import cascading.tuple.Fields import org.apache.log4j.{Logger, Level} @@ -59,76 +59,221 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val quorum = args("quorum") val sttKey = "01728" - val stpKey = "01831" + val stpKey = "03725" val sttKeyP = "8_01728" - val stpKeyP = "1_01831" + val stpKeyP = "5_03725" val listKey = List("01681", "01456") - val listKeyP = List("1_01681", "6_01456") - -// val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_ALL ).read -// .fromBytesWritable( TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanAllNoSalt01")) - + val listKeyP = List("0_01681", "6_01456") + val noSttKey = "9999990" + val noStpKey = "9999999" + val noSttKeyP = "9_9999990" + val noStpKeyP = "9_9999999" + val noListKey = List("0123456", "6543210") + val noListKeyP = List("6_0123456", "0_6543210") + + val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR + + val testName01 = "Scan All with NO useSalt" + val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_ALL, + inputSplitType = splitType ).read + .fromBytesWritable( TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanAllNoSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName02 = "Scan All with useSalt=true" val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read -// .fromBytesWritable( TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanAllPlusSalt01")) - -// val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangeNoSalt01")) -// -// val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangePlusSalt01")) -// -// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/GetListNoSalt01")) -// -// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/GetListPlusSalt01")) -// -// val hbase07 = -// new HBaseSource( "_TEST.SALT.03", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -// .read -// .fromBytesWritable( TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangePlusSalt10")) -// .toBytesWritable( TABLE_SCHEMA ) -// .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// useSalt = true )) -// -// val hbase08 = -// new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -// .read -// .fromBytesWritable('*) -// .write(TextLine("saltTesting/ScanRangePlusSalt03")) + sourceMode = SourceMode.SCAN_ALL, useSalt = true, + inputSplitType = splitType).read + .fromBytesWritable( TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanAllPlusSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName03 = "Scan Range with NO useSalt" + val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangePlusSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName04 = "Scan Range with useSalt=true" + val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangeNoSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + val testName05 = "Get List with NO useSalt" + val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListPlusSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName06 = "Get List with useSalt=true" + val hbase06 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = listKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListNoSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName08 = "Scan Range NO RESULTS" + val hbase08 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKey, stopKey = noStpKey, useSalt = true, prefixList = prefix, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangePlusSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName09 = "Scan Range NO RESULT with useSalt=true" + val hbase09 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKeyP, stopKey = noStpKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangeNoSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + val testName10 = "Get List NO RESULT" + val hbase10 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = noListKey, useSalt = true, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListPlusSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName11 = "Get List NO RESULT with useSalt=true" + val hbase11 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = noListKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListNoSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + +// ( +//// getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ +//// getTestResultPipe(getExpectedPipe(list01), hbase02, testName02) ++ +// getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++ +// getTestResultPipe(getExpectedPipe(list03), hbase04, testName03) ++ +// getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++ +// getTestResultPipe(getExpectedPipe(list05), hbase06, testName06) ++ +// assertPipeIsEmpty(hbase08, testName08) ++ +// assertPipeIsEmpty(hbase09, testName09) ++ +// assertPipeIsEmpty(hbase10, testName10) ++ +// assertPipeIsEmpty(hbase11, testName11) +// ).groupAll { group => +// group.sortBy('testName) +// } +// .write(Tsv("saltTesting/FinalTestResults")) + + /** + * We assume the pipe is empty + * + * We concatenate with a header - if the resulting size is 1 + * then the original size was 0 - then the pipe was empty :) + * + * The result is then returned in a Pipe + */ + def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = { + val headerPipe = IterableSource(List(testName), 'testData) + val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => + group.size('size) + } + .project('size) + + val result = + concatenation + .mapTo('size -> ('testName, 'result, 'expectedData, 'testData)) { x:String => { + if (x == "1") { + (testName, "Success", "", "") + } else { + (testName, "Test Failed", "", "") + } + } + } + + result + } + + /** + * Methods receives 2 pipes - and projects the results of testing + * + * expectedPipe should have a column 'expecteddata + * realHBasePipe should have a column 'hbasedata + */ + def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { + val results = expectedPipe.insert('testName , testName) + .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) + .map(('expectedData, 'testData)->'result) { x:(String,String) => + if (x._1.equals(x._2)) + "Success" + else + "Test Failed" + } + .project('testName, 'result, 'expectedData, 'testData) + results + } + + /** + * + */ + def getExpectedPipe ( expectedList: List[(String,String, String)]) : Pipe = { + IterableSource(expectedList, TABLE_SCHEMA) + .map(('key, 'salted, 'unsalted) -> 'expectedData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('expectedData) + .groupAll(group => group.toList[List[List[String]]]('expectedData -> 'expectedData)) + } + } class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index a8de7d6..17bc873 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobRunner import com.twitter.scalding.Args +import org.apache.log4j.{Level, Logger} object HBaseSaltTesterRunner extends App { @@ -25,12 +26,18 @@ object HBaseSaltTesterRunner extends App { val test = mArgs.getOrElse("test.data", "false").toBoolean val delete = mArgs.getOrElse("delete.data", "false").toBoolean + val isDebug = mArgs.getOrElse("debug", "false").toBoolean + + if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + + if( make ) { JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum + "--quorum", quorum, + "--debug", isDebug.toString )) } @@ -39,7 +46,9 @@ object HBaseSaltTesterRunner extends App { "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum + "--quorum", quorum, + "--debug", isDebug.toString, + "--regional", mArgs.getOrElse("regional", "false") )) } @@ -48,7 +57,8 @@ object HBaseSaltTesterRunner extends App { "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum + "--quorum", quorum, + "--debug", isDebug.toString )) } }
\ No newline at end of file |