From 3501e241a2313cf49c371630cb6ebe0c3a47e991 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Wed, 4 Sep 2013 10:32:07 +0100 Subject: Extensive changes to the underlying code base. Fully tested and working support for region level spliting Reduced number of mappers. --- .../spyglass/hbase/HBaseConfigUtils.java | 53 ++ .../parallelai/spyglass/hbase/HBaseConstants.java | 37 +- .../spyglass/hbase/HBaseInputFormat.java | 645 --------------------- .../spyglass/hbase/HBaseInputFormatBase.java | 172 ++++++ .../spyglass/hbase/HBaseInputFormatGranular.java | 459 +++++++++++++++ .../spyglass/hbase/HBaseInputFormatRegional.java | 99 ++++ .../spyglass/hbase/HBaseInputFormat_SINGLE.java | 622 -------------------- .../spyglass/hbase/HBaseMultiInputSplit.java | 111 ---- .../spyglass/hbase/HBaseRecordReader.java | 609 ------------------- .../spyglass/hbase/HBaseRecordReaderBase.java | 140 +++++ .../spyglass/hbase/HBaseRecordReaderGranular.java | 420 ++++++++++++++ .../spyglass/hbase/HBaseRecordReaderRegional.java | 124 ++++ .../spyglass/hbase/HBaseRecordReader_SINGLE.java | 505 ---------------- .../parallelai/spyglass/hbase/HBaseSalter.java | 14 +- .../parallelai/spyglass/hbase/HBaseScheme.java | 35 +- .../parallelai/spyglass/hbase/HBaseTableSplit.java | 219 ------- .../spyglass/hbase/HBaseTableSplitBase.java | 165 ++++++ .../spyglass/hbase/HBaseTableSplitGranular.java | 97 ++++ .../spyglass/hbase/HBaseTableSplitRegional.java | 127 ++++ .../java/parallelai/spyglass/hbase/HBaseTap.java | 25 +- .../parallelai/spyglass/hbase/HBaseSource.scala | 15 +- .../spyglass/hbase/testing/HBaseSaltTester.scala | 279 ++++++--- .../hbase/testing/HBaseSaltTesterRunner.scala | 16 +- 23 files changed, 2169 insertions(+), 2819 deletions(-) create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java delete mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java delete mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java delete mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java delete mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java delete mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java delete mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java (limited to 'src') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java new file mode 100644 index 0000000..598a988 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -0,0 +1,53 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 17:25 + * To change this template use File | Settings | File Templates. + */ +public class HBaseConfigUtils { + static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); + + public static void setRecordReaderParms(HBaseRecordReaderBase trr, HBaseTableSplitBase tSplit) throws IOException { + switch (tSplit.getSourceMode()) { + case SCAN_ALL: + case SCAN_RANGE: { + LOG.debug(String.format( + "For split [%s] we have start key (%s) and stop key (%s)", + tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setEndRowInclusive(tSplit.getEndRowInclusive()); + trr.setUseSalt(tSplit.getUseSalt()); + } + + break; + + case GET_LIST: { + LOG.debug(String.format("For split [%s] we have key list (%s)", + tSplit, tSplit.getKeyList())); + + trr.setKeyList(tSplit.getKeyList()); + trr.setVersions(tSplit.getVersions()); + trr.setUseSalt(tSplit.getUseSalt()); + } + + break; + + default: + throw new IOException("Unknown source mode : " + + tSplit.getSourceMode()); + } + + trr.setSourceMode(tSplit.getSourceMode()); + } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 25b89cb..5b5e9c3 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -3,21 +3,26 @@ package parallelai.spyglass.hbase; import org.apache.hadoop.conf.Configuration; public class HBaseConstants { - - public enum SourceMode { - EMPTY, - SCAN_ALL, - SCAN_RANGE, - GET_LIST; - } - public static final String START_KEY = "hbase.%s.startkey"; - public static final String STOP_KEY = "hbase.%s.stopkey"; - public static final String SOURCE_MODE = "hbase.%s.source.mode"; - public static final String KEY_LIST = "hbase.%s.key.list"; - public static final String VERSIONS = "hbase.%s.versions"; - public static final String USE_SALT = "hbase.%s.use.salt"; - public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + public enum SourceMode { + EMPTY, + SCAN_ALL, + SCAN_RANGE, + GET_LIST; + } - public static final String SINK_MODE = "hbase.%s.sink.mode"; -} + public enum SplitType { + GRANULAR, + REGIONAL; + } + + public static final String START_KEY = "hbase.%s.startkey"; + public static final String STOP_KEY = "hbase.%s.stopkey"; + public static final String SOURCE_MODE = "hbase.%s.source.mode"; + public static final String KEY_LIST = "hbase.%s.key.list"; + public static final String VERSIONS = "hbase.%s.versions"; + public static final String USE_SALT = "hbase.%s.use.salt"; + public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + + public static final String SINK_MODE = "hbase.%s.sink.mode"; +} \ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java deleted file mode 100644 index 5a1184f..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ /dev/null @@ -1,645 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import javax.naming.NamingException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseInputFormat implements - InputFormat, JobConfigurable { - - private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); - - private final String id = UUID.randomUUID().toString(); - - private byte[][] inputColumns; - private HTable table; - // private HBaseRecordReader tableRecordReader; - private Filter rowFilter; - // private String tableName = ""; - - private HashMap reverseDNSCacheMap = new HashMap(); - - private String nameServer = null; - - // private Scan scan = null; - - private HBaseMultiInputSplit[] convertToMultiSplitArray( - List splits) throws IOException { - - if (splits == null) - throw new IOException("The list of splits is null => " + splits); - - HashMap regionSplits = new HashMap(); - - for (HBaseTableSplit hbt : splits) { - HBaseMultiInputSplit mis = null; - if (regionSplits.containsKey(hbt.getRegionLocation())) { - mis = regionSplits.get(hbt.getRegionLocation()); - } else { - regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit( - hbt.getRegionLocation())); - mis = regionSplits.get(hbt.getRegionLocation()); - } - - mis.addSplit(hbt); - regionSplits.put(hbt.getRegionLocation(), mis); - } - - Collection outVals = regionSplits.values(); - - LOG.debug("".format("Returning array of splits : %s", outVals)); - - if (outVals == null) - throw new IOException("The list of multi input splits were null"); - - return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]); - } - - @SuppressWarnings("deprecation") - @Override - public HBaseMultiInputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (this.table == null) { - throw new IOException("No table was provided"); - } - - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); - } - - final Pair keys = table.getStartEndKeys(); - - if (keys == null || keys.getFirst() == null - || keys.getFirst().length == 0) { - HRegionLocation regLoc = table.getRegionLocation( - HConstants.EMPTY_BYTE_ARRAY, false); - - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - - final List splits = new ArrayList(); - HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split( - Addressing.HOSTNAME_PORT_SEPARATOR)[0], - SourceMode.EMPTY, false); - - splits.add(split); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - if (keys.getSecond() == null || keys.getSecond().length == 0) { - throw new IOException("Expecting at least one region."); - } - - if (keys.getFirst().length != keys.getSecond().length) { - throw new IOException("Regions for start and end key do not match"); - } - - byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; - byte[] maxKey = keys.getSecond()[0]; - - LOG.debug(String.format("SETTING min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - byte[][] regStartKeys = keys.getFirst(); - byte[][] regStopKeys = keys.getSecond(); - String[] regions = new String[regStartKeys.length]; - - for (int i = 0; i < regStartKeys.length; i++) { - minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) - && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] - : minKey; - maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) - && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] - : maxKey; - - HServerAddress regionServerAddress = table.getRegionLocation( - keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = regionServerAddress.getInetSocketAddress() - .getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress - + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - // HServerAddress regionServerAddress = - // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - // InetAddress regionAddress = - // regionServerAddress.getInetSocketAddress().getAddress(); - // - // String regionLocation; - // - // try { - // regionLocation = reverseDNS(regionAddress); - // } catch (NamingException e) { - // LOG.error("Cannot resolve the host name for " + regionAddress + - // " because of " + e); - // regionLocation = regionServerAddress.getHostname(); - // } - - // String regionLocation = - // table.getRegionLocation(keys.getFirst()[i]).getHostname(); - - LOG.debug("***** " + regionLocation); - - if (regionLocation == null || regionLocation.length() == 0) - throw new IOException("The region info for regiosn " + i - + " is null or empty"); - - regions[i] = regionLocation; - - LOG.debug(String.format( - "Region (%s) has start key (%s) and stop key (%s)", regions[i], - Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); - } - - byte[] startRow = HConstants.EMPTY_START_ROW; - byte[] stopRow = HConstants.EMPTY_END_ROW; - - LOG.debug(String.format("Found min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - LOG.debug("SOURCE MODE is : " + sourceMode); - - switch (sourceMode) { - case SCAN_ALL: - startRow = HConstants.EMPTY_START_ROW; - stopRow = HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN ALL: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - - case SCAN_RANGE: - startRow = (startKey != null && startKey.length() != 0) ? Bytes - .toBytes(startKey) : HConstants.EMPTY_START_ROW; - stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes - .toBytes(stopKey) : HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN RANGE: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - } - - switch (sourceMode) { - case EMPTY: - case SCAN_ALL: - case SCAN_RANGE: { - // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : - // startRow; - // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : - // stopRow; - - final List splits = new ArrayList(); - - if (!useSalt) { - - List validRegions = table.getRegionsInRange( - startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for (HRegionLocation cRegion : validRegions) { - byte[] rStart = cRegion.getRegionInfo().getStartKey(); - byte[] rStop = cRegion.getRegionInfo().getEndKey(); - - HServerAddress regionServerAddress = cRegion - .getServerAddress(); - InetAddress regionAddress = regionServerAddress - .getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " - + regionAddress + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte[] sStart = (startRow == HConstants.EMPTY_START_ROW - || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart - : startRow); - byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW - || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop - : stopRow); - - LOG.debug(String.format( - "BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes - .compareTo(startRow, rStart) <= 0)), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes - .compareTo(stopRow, rStop) >= 0)), rStop.length)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), sStart, sStop, regionLocation, - SourceMode.SCAN_RANGE, useSalt); - - split.setEndRowInclusive(currentRegion == maxRegions); - - currentRegion++; - - LOG.debug(String - .format( - "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), - Bytes.toString(stopRow), Bytes.toString(rStart), - Bytes.toString(rStop), Bytes.toString(sStart), - Bytes.toString(sStop), cRegion.getHostnamePort(), - split)); - - splits.add(split); - } - } else { - LOG.debug("Using SALT : " + useSalt); - - // Will return the start and the stop key with all possible - // prefixes. - for (int i = 0; i < regions.length; i++) { - Pair[] intervals = HBaseSalter - .getDistributedIntervals(startRow, stopRow, - regStartKeys[i], regStopKeys[i], prefixList); - - for (Pair pair : intervals) { - LOG.debug("".format( - "Using SALT, Region (%s) Start (%s) Stop (%s)", - regions[i], Bytes.toString(pair.getFirst()), - Bytes.toString(pair.getSecond()))); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), pair.getFirst(), - pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, - useSalt); - - split.setEndRowInclusive(true); - splits.add(split); - } - } - } - - LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - case GET_LIST: { - // if( keyList == null || keyList.size() == 0 ) { - if (keyList == null) { - throw new IOException( - "Source Mode is GET_LIST but key list is EMPTY"); - } - - if (useSalt) { - TreeSet tempKeyList = new TreeSet(); - - for (String key : keyList) { - tempKeyList.add(HBaseSalter.addSaltPrefix(key)); - } - - keyList = tempKeyList; - } - - LOG.info("".format("Splitting Key List (%s)", keyList)); - - final List splits = new ArrayList(); - - for (int i = 0; i < keys.getFirst().length; i++) { - - if (!includeRegionInSplit(keys.getFirst()[i], - keys.getSecond()[i])) { - continue; - } - - LOG.debug(String.format( - "Getting region (%s) subset (%s) to (%s)", regions[i], - Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - - Set regionsSubSet = null; - - if ((regStartKeys[i] == null || regStartKeys[i].length == 0) - && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { - LOG.debug("REGION start is empty"); - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList; - } else if (regStartKeys[i] == null - || regStartKeys[i].length == 0) { - LOG.debug("REGION start is empty"); - regionsSubSet = keyList.headSet( - Bytes.toString(regStopKeys[i]), true); - } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList.tailSet( - Bytes.toString(regStartKeys[i]), true); - } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { - regionsSubSet = keyList.subSet( - Bytes.toString(regStartKeys[i]), true, - Bytes.toString(regStopKeys[i]), true); - } else { - throw new IOException(String.format( - "For REGION (%s) Start Key (%s) > Stop Key(%s)", - regions[i], Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - } - - if (regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.debug("EMPTY: Key is for region " + regions[i] - + " is null"); - - continue; - } - - TreeSet regionKeyList = new TreeSet( - regionsSubSet); - - LOG.debug(String.format("Regions [%s] has key list <%s>", - regions[i], regionKeyList)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, versions, regions[i], - SourceMode.GET_LIST, useSalt); - splits.add(split); - } - - // if (splits.isEmpty()) { - // LOG.info("GOT EMPTY SPLITS"); - - // throw new IOException( - // "".format("Key List NOT found in any region")); - - // HRegionLocation regLoc = table.getRegionLocation( - // HConstants.EMPTY_BYTE_ARRAY, false); - // - // if (null == regLoc) { - // throw new IOException("Expecting at least one region."); - // } - // - // HBaseTableSplit split = new HBaseTableSplit( - // table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, - // HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort() - // .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], - // SourceMode.EMPTY, false); - // - // splits.add(split); - // } - - LOG.info("RETURNED SPLITS: split -> " + splits); - - // TODO: Change to HBaseMultiSplit - return convertToMultiSplitArray(splits); - } - - default: - throw new IOException("Unknown source Mode : " + sourceMode); - } - } - - private String reverseDNS(InetAddress ipAddress) throws NamingException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( - ipAddress, this.nameServer)); - this.reverseDNSCacheMap.put(ipAddress, hostName); - } - return hostName; - } - - @Override - public RecordReader getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - if (!(split instanceof HBaseMultiInputSplit)) - throw new IOException("Table Split is not type HBaseMultiInputSplit"); - - HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split; - - HBaseRecordReader trr = new HBaseRecordReader(tSplit); - - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - trr.setUseSalt(useSalt); - - trr.setNextSplit(); - - return trr; - } - - /* Configuration Section */ - - /** - * space delimited list of columns - */ - public static final String COLUMN_LIST = "hbase.tablecolumns"; - - /** - * Use this jobconf param to specify the input table - */ - private static final String INPUT_TABLE = "hbase.inputtable"; - - private String startKey = null; - private String stopKey = null; - - private SourceMode sourceMode = SourceMode.EMPTY; - private TreeSet keyList = null; - private int versions = 1; - private boolean useSalt = false; - private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - public void configure(JobConf job) { - String tableName = getTableName(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte[][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - - try { - setHTable(new HTable(HBaseConfiguration.create(job), tableName)); - } catch (Exception e) { - LOG.error("************* Table could not be created"); - LOG.error(StringUtils.stringifyException(e)); - } - - LOG.debug("Entered : " + this.getClass() + " : configure()"); - - useSalt = job.getBoolean( - String.format(HBaseConstants.USE_SALT, getTableName(job)), false); - prefixList = job.get( - String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), - HBaseSalter.DEFAULT_PREFIX_LIST); - - sourceMode = SourceMode.valueOf(job.get(String.format( - HBaseConstants.SOURCE_MODE, getTableName(job)))); - - LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String - .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job - .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), - sourceMode)); - - switch (sourceMode) { - case SCAN_RANGE: - LOG.info("HIT SCAN_RANGE"); - - startKey = getJobProp(job, - String.format(HBaseConstants.START_KEY, getTableName(job))); - stopKey = getJobProp(job, - String.format(HBaseConstants.STOP_KEY, getTableName(job))); - - LOG.info(String.format("Setting start key (%s) and stop key (%s)", - startKey, stopKey)); - break; - - case GET_LIST: - LOG.info("HIT GET_LIST"); - - Collection keys = job.getStringCollection(String.format( - HBaseConstants.KEY_LIST, getTableName(job))); - keyList = new TreeSet(keys); - - versions = job.getInt( - String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - - LOG.debug("GOT KEY LIST : " + keys); - LOG.debug(String.format("SETTING key list (%s)", keyList)); - - break; - - case EMPTY: - LOG.info("HIT EMPTY"); - - sourceMode = SourceMode.SCAN_ALL; - break; - - default: - LOG.info("HIT DEFAULT"); - - break; - } - } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - String tableName = getTableName(job); - - if (tableName == null) { - throw new IOException("expecting one table name"); - } - LOG.debug(String.format("Found Table name [%s]", tableName)); - - // connected to table? - if (getHTable() == null) { - throw new IOException("could not connect to table '" + tableName + "'"); - } - LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - LOG.debug(String.format("Found Columns [%s]", colArg)); - - LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, - stopKey)); - - if (sourceMode == SourceMode.EMPTY) { - throw new IOException("SourceMode should not be EMPTY"); - } - - if (sourceMode == SourceMode.GET_LIST - && (keyList == null || keyList.size() == 0)) { - throw new IOException("Source mode is GET_LIST bu key list is empty"); - } - } - - /* Getters & Setters */ - private HTable getHTable() { - return this.table; - } - - private void setHTable(HTable ht) { - this.table = ht; - } - - private void setInputColumns(byte[][] ic) { - this.inputColumns = ic; - } - - private void setJobProp(JobConf job, String key, String value) { - if (job.get(key) != null) - throw new RuntimeException(String.format( - "Job Conf already has key [%s] with value [%s]", key, - job.get(key))); - job.set(key, value); - } - - private String getJobProp(JobConf job, String key) { - return job.get(key); - } - - public static void setTableName(JobConf job, String tableName) { - // Make sure that table has not been set before - String oldTableName = getTableName(job); - if (oldTableName != null) { - throw new RuntimeException("table name already set to: '" - + oldTableName + "'"); - } - - job.set(INPUT_TABLE, tableName); - } - - public static String getTableName(JobConf job) { - return job.get(INPUT_TABLE); - } - - protected boolean includeRegionInSplit(final byte[] startKey, - final byte[] endKey) { - return true; - } -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java new file mode 100644 index 0000000..2f3047b --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -0,0 +1,172 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.util.StringUtils; + +import java.util.Collection; +import java.util.TreeSet; +import java.util.UUID; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:43 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseInputFormatBase implements InputFormat, JobConfigurable { + + private final Log LOG = LogFactory.getLog(HBaseInputFormatBase.class); + + protected final String id = UUID.randomUUID().toString(); + protected byte[][] inputColumns; + protected HTable table; + protected Filter rowFilter; + + public static final String COLUMN_LIST = "hbase.tablecolumns"; + + /** + * Use this jobconf param to specify the input table + */ + protected static final String INPUT_TABLE = "hbase.inputtable"; + + protected String startKey = null; + protected String stopKey = null; + + protected HBaseConstants.SourceMode sourceMode = HBaseConstants.SourceMode.EMPTY; + protected TreeSet keyList = null; + protected int versions = 1; + protected boolean useSalt = false; + protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + + + + @Override + public void configure(JobConf job) { + String tableName = getTableName(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte[][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColumns(m_cols); + + try { + setHTable(new HTable(HBaseConfiguration.create(job), tableName)); + } catch (Exception e) { + LOG.error("************* Table could not be created"); + LOG.error(StringUtils.stringifyException(e)); + } + + LOG.debug("Entered : " + this.getClass() + " : configure()"); + + useSalt = job.getBoolean( + String.format(HBaseConstants.USE_SALT, getTableName(job)), false); + prefixList = job.get( + String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), + HBaseSalter.DEFAULT_PREFIX_LIST); + + sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format( + HBaseConstants.SOURCE_MODE, getTableName(job)))); + + LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String + .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job + .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), + sourceMode)); + + switch (sourceMode) { + case SCAN_RANGE: + LOG.debug("HIT SCAN_RANGE"); + + startKey = getJobProp(job, + String.format(HBaseConstants.START_KEY, getTableName(job))); + stopKey = getJobProp(job, + String.format(HBaseConstants.STOP_KEY, getTableName(job))); + + LOG.debug(String.format("Setting start key (%s) and stop key (%s)", + startKey, stopKey)); + break; + + case GET_LIST: + LOG.debug("HIT GET_LIST"); + + Collection keys = job.getStringCollection(String.format( + HBaseConstants.KEY_LIST, getTableName(job))); + keyList = new TreeSet(keys); + + versions = job.getInt( + String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + + LOG.debug("GOT KEY LIST : " + keys); + LOG.debug(String.format("SETTING key list (%s)", keyList)); + + break; + + case EMPTY: + LOG.info("HIT EMPTY"); + + sourceMode = HBaseConstants.SourceMode.SCAN_ALL; + break; + + default: + LOG.info("HIT DEFAULT"); + + break; + } + } + + /* Getters & Setters */ + protected HTable getHTable() { + return this.table; + } + + protected void setHTable(HTable ht) { + this.table = ht; + } + + protected void setInputColumns(byte[][] ic) { + this.inputColumns = ic; + } + + protected void setJobProp(JobConf job, String key, String value) { + if (job.get(key) != null) + throw new RuntimeException(String.format( + "Job Conf already has key [%s] with value [%s]", key, + job.get(key))); + job.set(key, value); + } + + protected String getJobProp(JobConf job, String key) { + return job.get(key); + } + + public static void setTableName(JobConf job, String tableName) { + // Make sure that table has not been set before + String oldTableName = getTableName(job); + if (oldTableName != null) { + throw new RuntimeException("table name already set to: '" + + oldTableName + "'"); + } + + job.set(INPUT_TABLE, tableName); + } + + public static String getTableName(JobConf job) { + return job.get(INPUT_TABLE); + } + + protected void setParms(HBaseRecordReaderBase trr) { + + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java new file mode 100644 index 0000000..929e9d8 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -0,0 +1,459 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseInputFormatGranular extends HBaseInputFormatBase { + + private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); + + // private String tableName = ""; + + private HashMap reverseDNSCacheMap = new HashMap(); + + private String nameServer = null; + + // private Scan scan = null; + + @SuppressWarnings("deprecation") + @Override + public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { + if (this.table == null) { + throw new IOException("No table was provided"); + } + + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + + Pair keys = table.getStartEndKeys(); + + if (keys == null || keys.getFirst() == null + || keys.getFirst().length == 0) { + HRegionLocation regLoc = table.getRegionLocation( + HConstants.EMPTY_BYTE_ARRAY, false); + + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + + List splits = new ArrayList(1); + HBaseTableSplitGranular split = new HBaseTableSplitGranular(table.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split( + Addressing.HOSTNAME_PORT_SEPARATOR)[0], + SourceMode.EMPTY, false); + + splits.add(split); + + return splits.toArray(new HBaseTableSplitGranular[splits.size()]); + } + + if (keys.getSecond() == null || keys.getSecond().length == 0) { + throw new IOException("Expecting at least one region."); + } + + if (keys.getFirst().length != keys.getSecond().length) { + throw new IOException("Regions for start and end key do not match"); + } + + byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; + byte[] maxKey = keys.getSecond()[0]; + + LOG.debug(String.format("SETTING min key (%s) and max key (%s)", + Bytes.toString(minKey), Bytes.toString(maxKey))); + + byte[][] regStartKeys = keys.getFirst(); + byte[][] regStopKeys = keys.getSecond(); + String[] regions = new String[regStartKeys.length]; + + for (int i = 0; i < regStartKeys.length; i++) { + minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) + && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] + : minKey; + maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) + && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] + : maxKey; + + HServerAddress regionServerAddress = table.getRegionLocation( + keys.getFirst()[i]).getServerAddress(); + InetAddress regionAddress = regionServerAddress.getInetSocketAddress() + .getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + // HServerAddress regionServerAddress = + // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); + // InetAddress regionAddress = + // regionServerAddress.getInetSocketAddress().getAddress(); + // + // String regionLocation; + // + // try { + // regionLocation = reverseDNS(regionAddress); + // } catch (NamingException e) { + // LOG.error("Cannot resolve the host name for " + regionAddress + + // " because of " + e); + // regionLocation = regionServerAddress.getHostname(); + // } + + // String regionLocation = + // table.getRegionLocation(keys.getFirst()[i]).getHostname(); + + LOG.debug("***** " + regionLocation); + + if (regionLocation == null || regionLocation.length() == 0) + throw new IOException("The region info for regiosn " + i + + " is null or empty"); + + regions[i] = regionLocation; + + LOG.debug(String.format( + "Region (%s) has start key (%s) and stop key (%s)", regions[i], + Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); + } + + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + + LOG.debug(String.format("Found min key (%s) and max key (%s)", + Bytes.toString(minKey), Bytes.toString(maxKey))); + + LOG.debug("SOURCE MODE is : " + sourceMode); + + switch (sourceMode) { + case SCAN_ALL: + startRow = HConstants.EMPTY_START_ROW; + stopRow = HConstants.EMPTY_END_ROW; + + LOG.debug(String.format( + "SCAN ALL: Found start key (%s) and stop key (%s)", + Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + + case SCAN_RANGE: + startRow = (startKey != null && startKey.length() != 0) ? Bytes + .toBytes(startKey) : HConstants.EMPTY_START_ROW; + stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes + .toBytes(stopKey) : HConstants.EMPTY_END_ROW; + + LOG.debug(String.format( + "SCAN RANGE: Found start key (%s) and stop key (%s)", + Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + } + + switch (sourceMode) { + case EMPTY: + case SCAN_ALL: + case SCAN_RANGE: { + // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : + // startRow; + // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : + // stopRow; + + List splits = new ArrayList(); + + if (!useSalt) { + + List validRegions = table.getRegionsInRange( + startRow, stopRow); + + int maxRegions = validRegions.size(); + int currentRegion = 1; + + for (HRegionLocation cRegion : validRegions) { + byte[] rStart = cRegion.getRegionInfo().getStartKey(); + byte[] rStop = cRegion.getRegionInfo().getEndKey(); + + HServerAddress regionServerAddress = cRegion + .getServerAddress(); + InetAddress regionAddress = regionServerAddress + .getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + + regionAddress + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + byte[] sStart = (startRow == HConstants.EMPTY_START_ROW + || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart + : startRow); + byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW + || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop + : stopRow); + + LOG.debug(String.format( + "BOOL start (%s) stop (%s) length (%d)", + (startRow == HConstants.EMPTY_START_ROW || (Bytes + .compareTo(startRow, rStart) <= 0)), + (stopRow == HConstants.EMPTY_END_ROW || (Bytes + .compareTo(stopRow, rStop) >= 0)), rStop.length)); + + HBaseTableSplitGranular split = new HBaseTableSplitGranular( + table.getTableName(), sStart, sStop, regionLocation, + SourceMode.SCAN_RANGE, useSalt); + + split.setEndRowInclusive(currentRegion == maxRegions); + + currentRegion++; + + LOG.debug(String + .format( + "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", + Bytes.toString(startRow), + Bytes.toString(stopRow), Bytes.toString(rStart), + Bytes.toString(rStop), Bytes.toString(sStart), + Bytes.toString(sStop), cRegion.getHostnamePort(), + split)); + + splits.add(split); + } + } else { + LOG.debug("Using SALT : " + useSalt); + + // Will return the start and the stop key with all possible + // prefixes. + for (int i = 0; i < regions.length; i++) { + Pair[] intervals = HBaseSalter + .getDistributedIntervals(startRow, stopRow, + regStartKeys[i], regStopKeys[i], prefixList); + + for (Pair pair : intervals) { + LOG.debug("".format( + "Using SALT, Region (%s) Start (%s) Stop (%s)", + regions[i], Bytes.toString(pair.getFirst()), + Bytes.toString(pair.getSecond()))); + + HBaseTableSplitGranular split = new HBaseTableSplitGranular( + table.getTableName(), pair.getFirst(), + pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, + useSalt); + + split.setEndRowInclusive(true); + splits.add(split); + } + } + } + + LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); + for (HBaseTableSplitGranular s : splits) { + LOG.debug("RETURNED SPLITS: split -> " + s); + } + + return splits.toArray(new HBaseTableSplitGranular[splits.size()]); + } + + case GET_LIST: { + // if( keyList == null || keyList.size() == 0 ) { + if (keyList == null) { + throw new IOException( + "Source Mode is GET_LIST but key list is EMPTY"); + } + + if (useSalt) { + TreeSet tempKeyList = new TreeSet(); + + for (String key : keyList) { + tempKeyList.add(HBaseSalter.addSaltPrefix(key)); + } + + keyList = tempKeyList; + } + + LOG.debug("".format("Splitting Key List (%s)", keyList)); + + List splits = new ArrayList(); + + for (int i = 0; i < keys.getFirst().length; i++) { + + if (!includeRegionInSplit(keys.getFirst()[i], + keys.getSecond()[i])) { + continue; + } + + LOG.debug(String.format( + "Getting region (%s) subset (%s) to (%s)", regions[i], + Bytes.toString(regStartKeys[i]), + Bytes.toString(regStopKeys[i]))); + + Set regionsSubSet = null; + + if ((regStartKeys[i] == null || regStartKeys[i].length == 0) + && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { + LOG.debug("REGION start is empty"); + LOG.debug("REGION stop is empty"); + regionsSubSet = keyList; + } else if (regStartKeys[i] == null + || regStartKeys[i].length == 0) { + LOG.debug("REGION start is empty"); + regionsSubSet = keyList.headSet( + Bytes.toString(regStopKeys[i]), true); + } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { + LOG.debug("REGION stop is empty"); + regionsSubSet = keyList.tailSet( + Bytes.toString(regStartKeys[i]), true); + } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { + regionsSubSet = keyList.subSet( + Bytes.toString(regStartKeys[i]), true, + Bytes.toString(regStopKeys[i]), true); + } else { + throw new IOException(String.format( + "For REGION (%s) Start Key (%s) > Stop Key(%s)", + regions[i], Bytes.toString(regStartKeys[i]), + Bytes.toString(regStopKeys[i]))); + } + + if (regionsSubSet == null || regionsSubSet.size() == 0) { + LOG.debug("EMPTY: Key is for region " + regions[i] + + " is null"); + + continue; + } + + TreeSet regionKeyList = new TreeSet( + regionsSubSet); + + LOG.debug(String.format("Regions [%s] has key list <%s>", + regions[i], regionKeyList)); + + HBaseTableSplitGranular split = new HBaseTableSplitGranular( + table.getTableName(), regionKeyList, versions, regions[i], + SourceMode.GET_LIST, useSalt); + splits.add(split); + } + + LOG.debug("RETURNED SPLITS: split -> " + splits); + + return splits.toArray(new HBaseTableSplitGranular[splits.size()]); + } + + default: + throw new IOException("Unknown source Mode : " + sourceMode); + } + } + + private String reverseDNS(InetAddress ipAddress) throws NamingException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( + ipAddress, this.nameServer)); + this.reverseDNSCacheMap.put(ipAddress, hostName); + } + return hostName; + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + + LOG.info("GRANULAR SPLIT -> " + split); + + if (!(split instanceof HBaseTableSplitGranular)) + throw new IOException("Table Split is not type HBaseTableSplitGranular"); + + HBaseTableSplitGranular tSplit = (HBaseTableSplitGranular) split; + + HBaseRecordReaderGranular trr = new HBaseRecordReaderGranular(); + + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + + trr.init(); + + return trr; + } + + /* Configuration Section */ + + /** + * space delimited list of columns + */ + + public void validateInput(JobConf job) throws IOException { + // expecting exactly one path + String tableName = getTableName(job); + + if (tableName == null) { + throw new IOException("expecting one table name"); + } + LOG.debug(String.format("Found Table name [%s]", tableName)); + + // connected to table? + if (getHTable() == null) { + throw new IOException("could not connect to table '" + tableName + "'"); + } + LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + LOG.debug(String.format("Found Columns [%s]", colArg)); + + LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, + stopKey)); + + if (sourceMode == SourceMode.EMPTY) { + throw new IOException("SourceMode should not be EMPTY"); + } + + if (sourceMode == SourceMode.GET_LIST + && (keyList == null || keyList.size() == 0)) { + throw new IOException("Source mode is GET_LIST bu key list is empty"); + } + } + + + protected boolean includeRegionInSplit(final byte[] startKey, + final byte[] endKey) { + return true; + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java new file mode 100644 index 0000000..eadb57e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:24 + * To change this template use File | Settings | File Templates. + */ +public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); + private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); + + + @Override + public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { + granular.configure(job); + HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits); + + HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits ); + + if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL"); + + LOG.info("GRANULAR => " + gSplits); + LOG.info("REGIONAL => " + splits); + + return splits; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + if (!(inputSplit instanceof HBaseTableSplitRegional)) + throw new IOException("Table Split is not type HBaseTableSplitRegional"); + + LOG.info("REGIONAL SPLIT -> " + inputSplit); + + HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit; + + HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional(); + + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + + trr.init(tSplit); + + return trr; + } + + private HBaseTableSplitRegional[] convertToMultiSplitArray( + HBaseTableSplitGranular[] splits) throws IOException { + + if (splits == null) + throw new IOException("The list of splits is null => " + splits); + + HashMap regionSplits = new HashMap(); + + for (HBaseTableSplitGranular hbt : splits) { + HBaseTableSplitRegional mis = null; + if (regionSplits.containsKey(hbt.getRegionLocation())) { + mis = regionSplits.get(hbt.getRegionLocation()); + } else { + regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional( + hbt.getRegionLocation())); + mis = regionSplits.get(hbt.getRegionLocation()); + } + + mis.addSplit(hbt); + regionSplits.put(hbt.getRegionLocation(), mis); + } + +// for(String region : regionSplits.keySet() ) { +// regionSplits.get(region) +// } + + Collection outVals = regionSplits.values(); + + LOG.debug("".format("Returning array of splits : %s", outVals)); + + if (outVals == null) + throw new IOException("The list of multi input splits were null"); + + return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]); + } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java deleted file mode 100644 index 96bfea1..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat_SINGLE.java +++ /dev/null @@ -1,622 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import javax.naming.NamingException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseInputFormat_SINGLE implements - InputFormat, JobConfigurable { - - private final Log LOG = LogFactory.getLog(HBaseInputFormat_SINGLE.class); - - private final String id = UUID.randomUUID().toString(); - - private byte[][] inputColumns; - private HTable table; - // private HBaseRecordReader tableRecordReader; - private Filter rowFilter; - // private String tableName = ""; - - private HashMap reverseDNSCacheMap = new HashMap(); - - private String nameServer = null; - - // private Scan scan = null; - - @SuppressWarnings("deprecation") - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (this.table == null) { - throw new IOException("No table was provided"); - } - - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); - } - - Pair keys = table.getStartEndKeys(); - - if (keys == null || keys.getFirst() == null - || keys.getFirst().length == 0) { - HRegionLocation regLoc = table.getRegionLocation( - HConstants.EMPTY_BYTE_ARRAY, false); - - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - - List splits = new ArrayList(1); - HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split( - Addressing.HOSTNAME_PORT_SEPARATOR)[0], - SourceMode.EMPTY, false); - - splits.add(split); - - return splits.toArray(new HBaseTableSplit[splits.size()]); - } - - if (keys.getSecond() == null || keys.getSecond().length == 0) { - throw new IOException("Expecting at least one region."); - } - - if (keys.getFirst().length != keys.getSecond().length) { - throw new IOException("Regions for start and end key do not match"); - } - - byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; - byte[] maxKey = keys.getSecond()[0]; - - LOG.debug(String.format("SETTING min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - byte[][] regStartKeys = keys.getFirst(); - byte[][] regStopKeys = keys.getSecond(); - String[] regions = new String[regStartKeys.length]; - - for (int i = 0; i < regStartKeys.length; i++) { - minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) - && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] - : minKey; - maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) - && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] - : maxKey; - - HServerAddress regionServerAddress = table.getRegionLocation( - keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = regionServerAddress.getInetSocketAddress() - .getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress - + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - // HServerAddress regionServerAddress = - // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - // InetAddress regionAddress = - // regionServerAddress.getInetSocketAddress().getAddress(); - // - // String regionLocation; - // - // try { - // regionLocation = reverseDNS(regionAddress); - // } catch (NamingException e) { - // LOG.error("Cannot resolve the host name for " + regionAddress + - // " because of " + e); - // regionLocation = regionServerAddress.getHostname(); - // } - - // String regionLocation = - // table.getRegionLocation(keys.getFirst()[i]).getHostname(); - - LOG.debug("***** " + regionLocation); - - if (regionLocation == null || regionLocation.length() == 0) - throw new IOException("The region info for regiosn " + i - + " is null or empty"); - - regions[i] = regionLocation; - - LOG.debug(String.format( - "Region (%s) has start key (%s) and stop key (%s)", regions[i], - Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); - } - - byte[] startRow = HConstants.EMPTY_START_ROW; - byte[] stopRow = HConstants.EMPTY_END_ROW; - - LOG.debug(String.format("Found min key (%s) and max key (%s)", - Bytes.toString(minKey), Bytes.toString(maxKey))); - - LOG.debug("SOURCE MODE is : " + sourceMode); - - switch (sourceMode) { - case SCAN_ALL: - startRow = HConstants.EMPTY_START_ROW; - stopRow = HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN ALL: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - - case SCAN_RANGE: - startRow = (startKey != null && startKey.length() != 0) ? Bytes - .toBytes(startKey) : HConstants.EMPTY_START_ROW; - stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes - .toBytes(stopKey) : HConstants.EMPTY_END_ROW; - - LOG.info(String.format( - "SCAN RANGE: Found start key (%s) and stop key (%s)", - Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - } - - switch (sourceMode) { - case EMPTY: - case SCAN_ALL: - case SCAN_RANGE: { - // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : - // startRow; - // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : - // stopRow; - - List splits = new ArrayList(); - - if (!useSalt) { - - List validRegions = table.getRegionsInRange( - startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for (HRegionLocation cRegion : validRegions) { - byte[] rStart = cRegion.getRegionInfo().getStartKey(); - byte[] rStop = cRegion.getRegionInfo().getEndKey(); - - HServerAddress regionServerAddress = cRegion - .getServerAddress(); - InetAddress regionAddress = regionServerAddress - .getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " - + regionAddress + " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte[] sStart = (startRow == HConstants.EMPTY_START_ROW - || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart - : startRow); - byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW - || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop - : stopRow); - - LOG.debug(String.format( - "BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes - .compareTo(startRow, rStart) <= 0)), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes - .compareTo(stopRow, rStop) >= 0)), rStop.length)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), sStart, sStop, regionLocation, - SourceMode.SCAN_RANGE, useSalt); - - split.setEndRowInclusive(currentRegion == maxRegions); - - currentRegion++; - - LOG.debug(String - .format( - "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), - Bytes.toString(stopRow), Bytes.toString(rStart), - Bytes.toString(rStop), Bytes.toString(sStart), - Bytes.toString(sStop), cRegion.getHostnamePort(), - split)); - - splits.add(split); - } - } else { - LOG.debug("Using SALT : " + useSalt); - - // Will return the start and the stop key with all possible - // prefixes. - for (int i = 0; i < regions.length; i++) { - Pair[] intervals = HBaseSalter - .getDistributedIntervals(startRow, stopRow, - regStartKeys[i], regStopKeys[i], prefixList); - - for (Pair pair : intervals) { - LOG.info("".format( - "Using SALT, Region (%s) Start (%s) Stop (%s)", - regions[i], Bytes.toString(pair.getFirst()), - Bytes.toString(pair.getSecond()))); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), pair.getFirst(), - pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, - useSalt); - - split.setEndRowInclusive(true); - splits.add(split); - } - } - } - - LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); - for (HBaseTableSplit s : splits) { - LOG.info("RETURNED SPLITS: split -> " + s); - } - - return splits.toArray(new HBaseTableSplit[splits.size()]); - } - - case GET_LIST: { - // if( keyList == null || keyList.size() == 0 ) { - if (keyList == null) { - throw new IOException( - "Source Mode is GET_LIST but key list is EMPTY"); - } - - if (useSalt) { - TreeSet tempKeyList = new TreeSet(); - - for (String key : keyList) { - tempKeyList.add(HBaseSalter.addSaltPrefix(key)); - } - - keyList = tempKeyList; - } - - LOG.debug("".format("Splitting Key List (%s)", keyList)); - - List splits = new ArrayList(); - - for (int i = 0; i < keys.getFirst().length; i++) { - - if (!includeRegionInSplit(keys.getFirst()[i], - keys.getSecond()[i])) { - continue; - } - - LOG.debug(String.format( - "Getting region (%s) subset (%s) to (%s)", regions[i], - Bytes.toString(regStartKeys[i]), - Bytes.toString(regStartKeys[i]))); - - Set regionsSubSet = null; - - if ((regStartKeys[i] == null || regStartKeys[i].length == 0) - && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { - LOG.debug("REGION start is empty"); - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList; - } else if (regStartKeys[i] == null - || regStartKeys[i].length == 0) { - LOG.debug("REGION start is empty"); - regionsSubSet = keyList.headSet( - Bytes.toString(regStopKeys[i]), true); - } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList.tailSet( - Bytes.toString(regStartKeys[i]), true); - } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { - regionsSubSet = keyList.subSet( - Bytes.toString(regStartKeys[i]), true, - Bytes.toString(regStopKeys[i]), true); - } else { - throw new IOException(String.format( - "For REGION (%s) Start Key (%s) > Stop Key(%s)", - regions[i], Bytes.toString(regStartKeys[i]), - Bytes.toString(regStopKeys[i]))); - } - - if (regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.debug("EMPTY: Key is for region " + regions[i] - + " is null"); - - continue; - } - - TreeSet regionKeyList = new TreeSet( - regionsSubSet); - - LOG.debug(String.format("Regions [%s] has key list <%s>", - regions[i], regionKeyList)); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, versions, regions[i], - SourceMode.GET_LIST, useSalt); - splits.add(split); - } - - LOG.debug("RETURNED SPLITS: split -> " + splits); - - return splits.toArray(new HBaseTableSplit[splits.size()]); - } - - default: - throw new IOException("Unknown source Mode : " + sourceMode); - } - } - - private String reverseDNS(InetAddress ipAddress) throws NamingException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( - ipAddress, this.nameServer)); - this.reverseDNSCacheMap.put(ipAddress, hostName); - } - return hostName; - } - - @Override - public RecordReader getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - if (!(split instanceof HBaseTableSplit)) - throw new IOException("Table Split is not type HBaseTableSplit"); - - HBaseTableSplit tSplit = (HBaseTableSplit) split; - - HBaseRecordReader_SINGLE trr = new HBaseRecordReader_SINGLE(); - - switch (tSplit.getSourceMode()) { - case SCAN_ALL: - case SCAN_RANGE: { - LOG.debug(String.format( - "For split [%s] we have start key (%s) and stop key (%s)", - tSplit, tSplit.getStartRow(), tSplit.getEndRow())); - - trr.setStartRow(tSplit.getStartRow()); - trr.setEndRow(tSplit.getEndRow()); - trr.setEndRowInclusive(tSplit.getEndRowInclusive()); - trr.setUseSalt(useSalt); - } - - break; - - case GET_LIST: { - LOG.debug(String.format("For split [%s] we have key list (%s)", - tSplit, tSplit.getKeyList())); - - trr.setKeyList(tSplit.getKeyList()); - trr.setVersions(tSplit.getVersions()); - trr.setUseSalt(useSalt); - } - - break; - - default: - throw new IOException("Unknown source mode : " - + tSplit.getSourceMode()); - } - - trr.setSourceMode(tSplit.getSourceMode()); - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - - trr.init(); - - return trr; - } - - /* Configuration Section */ - - /** - * space delimited list of columns - */ - public static final String COLUMN_LIST = "hbase.tablecolumns"; - - /** - * Use this jobconf param to specify the input table - */ - private static final String INPUT_TABLE = "hbase.inputtable"; - - private String startKey = null; - private String stopKey = null; - - private SourceMode sourceMode = SourceMode.EMPTY; - private TreeSet keyList = null; - private int versions = 1; - private boolean useSalt = false; - private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - public void configure(JobConf job) { - String tableName = getTableName(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte[][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - - try { - setHTable(new HTable(HBaseConfiguration.create(job), tableName)); - } catch (Exception e) { - LOG.error("************* Table could not be created"); - LOG.error(StringUtils.stringifyException(e)); - } - - LOG.debug("Entered : " + this.getClass() + " : configure()"); - - useSalt = job.getBoolean( - String.format(HBaseConstants.USE_SALT, getTableName(job)), false); - prefixList = job.get( - String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), - HBaseSalter.DEFAULT_PREFIX_LIST); - - sourceMode = SourceMode.valueOf(job.get(String.format( - HBaseConstants.SOURCE_MODE, getTableName(job)))); - - LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String - .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job - .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), - sourceMode)); - - switch (sourceMode) { - case SCAN_RANGE: - LOG.info("HIT SCAN_RANGE"); - - startKey = getJobProp(job, - String.format(HBaseConstants.START_KEY, getTableName(job))); - stopKey = getJobProp(job, - String.format(HBaseConstants.STOP_KEY, getTableName(job))); - - LOG.info(String.format("Setting start key (%s) and stop key (%s)", - startKey, stopKey)); - break; - - case GET_LIST: - LOG.info("HIT GET_LIST"); - - Collection keys = job.getStringCollection(String.format( - HBaseConstants.KEY_LIST, getTableName(job))); - keyList = new TreeSet(keys); - - versions = job.getInt( - String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - - LOG.debug("GOT KEY LIST : " + keys); - LOG.debug(String.format("SETTING key list (%s)", keyList)); - - break; - - case EMPTY: - LOG.info("HIT EMPTY"); - - sourceMode = SourceMode.SCAN_ALL; - break; - - default: - LOG.info("HIT DEFAULT"); - - break; - } - } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - String tableName = getTableName(job); - - if (tableName == null) { - throw new IOException("expecting one table name"); - } - LOG.debug(String.format("Found Table name [%s]", tableName)); - - // connected to table? - if (getHTable() == null) { - throw new IOException("could not connect to table '" + tableName + "'"); - } - LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - LOG.debug(String.format("Found Columns [%s]", colArg)); - - LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, - stopKey)); - - if (sourceMode == SourceMode.EMPTY) { - throw new IOException("SourceMode should not be EMPTY"); - } - - if (sourceMode == SourceMode.GET_LIST - && (keyList == null || keyList.size() == 0)) { - throw new IOException("Source mode is GET_LIST bu key list is empty"); - } - } - - /* Getters & Setters */ - private HTable getHTable() { - return this.table; - } - - private void setHTable(HTable ht) { - this.table = ht; - } - - private void setInputColumns(byte[][] ic) { - this.inputColumns = ic; - } - - private void setJobProp(JobConf job, String key, String value) { - if (job.get(key) != null) - throw new RuntimeException(String.format( - "Job Conf already has key [%s] with value [%s]", key, - job.get(key))); - job.set(key, value); - } - - private String getJobProp(JobConf job, String key) { - return job.get(key); - } - - public static void setTableName(JobConf job, String tableName) { - // Make sure that table has not been set before - String oldTableName = getTableName(job); - if (oldTableName != null) { - throw new RuntimeException("table name already set to: '" - + oldTableName + "'"); - } - - job.set(INPUT_TABLE, tableName); - } - - public static String getTableName(JobConf job) { - return job.get(INPUT_TABLE); - } - - protected boolean includeRegionInSplit(final byte[] startKey, - final byte[] endKey) { - return true; - } -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java deleted file mode 100644 index 02e7f7b..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java +++ /dev/null @@ -1,111 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang.SerializationUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -public class HBaseMultiInputSplit implements InputSplit, - Comparable, Serializable { - - private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class); - - private List splits = new ArrayList(); - - private String regionLocation = null; - - /** default constructor */ - private HBaseMultiInputSplit() { - - } - - public HBaseMultiInputSplit(String regionLocation) { - this.regionLocation = regionLocation; - } - - /** @return the region's hostname */ - public String getRegionLocation() { - LOG.debug("REGION GETTER : " + regionLocation); - - return this.regionLocation; - } - - @Override - public void readFields(DataInput in) throws IOException { - LOG.debug("READ ME : " + in.toString()); - - int s = Bytes.toInt(Bytes.readByteArray(in)); - - for (int i = 0; i < s; i++) { - HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils - .deserialize(Bytes.readByteArray(in)); - splits.add(hbts); - } - - LOG.debug("READ and CREATED : " + this); - } - - @Override - public void write(DataOutput out) throws IOException { - LOG.debug("WRITE : " + this); - - Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); - - for (HBaseTableSplit hbts : splits) { - Bytes.writeByteArray(out, SerializationUtils.serialize(hbts)); - } - - LOG.debug("WROTE : " + out.toString()); - } - - @Override - public String toString() { - StringBuffer str = new StringBuffer(); - str.append("HBaseMultiSplit : "); - - for (HBaseTableSplit hbt : splits) { - str.append(" [" + hbt.toString() + "]"); - } - - return str.toString(); - } - - @Override - public int compareTo(HBaseMultiInputSplit o) { - // TODO: Make this comparison better - return (splits.size() - o.splits.size()); - } - - @Override - public long getLength() throws IOException { - return splits.size(); - } - - @Override - public String[] getLocations() throws IOException { - LOG.debug("REGION ARRAY : " + regionLocation); - - return new String[] { this.regionLocation }; - } - - public void addSplit(HBaseTableSplit hbt) throws IOException { - if (hbt.getRegionLocation().equals(regionLocation)) - splits.add(hbt); - else - throw new IOException("HBaseTableSplit Region Location " - + hbt.getRegionLocation() - + " does NOT match MultiSplit Region Location " + regionLocation); - } - - public List getSplits() { - return splits; - } -} \ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java deleted file mode 100644 index 5d7dbdd..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ /dev/null @@ -1,609 +0,0 @@ -package parallelai.spyglass.hbase; - -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.Vector; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader implements - RecordReader { - - static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - - private byte[] startRow; - private byte[] endRow; - private byte[] lastSuccessfulRow; - private TreeSet keyList; - private SourceMode sourceMode; - private Filter trrRowFilter; - private ResultScanner scanner; - private HTable htable; - private byte[][] trrInputColumns; - private long timestamp; - private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - private boolean endRowInclusive = true; - private int versions = 1; - private boolean useSalt = false; - - private HBaseMultiInputSplit multiSplit = null; - private List allSplits = null; - - private HBaseRecordReader() { - } - - public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException { - multiSplit = mSplit; - - LOG.info("Creatin Multi Split for region location : " - + multiSplit.getRegionLocation()); - - allSplits = multiSplit.getSplits(); - } - - public boolean setNextSplit() throws IOException { - if (allSplits.size() > 0) { - setSplitValue(allSplits.remove(0)); - return true; - } else { - return false; - } - } - - private void setSplitValue(HBaseTableSplit tSplit) throws IOException { - switch (tSplit.getSourceMode()) { - case SCAN_ALL: - case SCAN_RANGE: { - LOG.debug(String.format( - "For split [%s] we have start key (%s) and stop key (%s)", - tSplit, tSplit.getStartRow(), tSplit.getEndRow())); - - setStartRow(tSplit.getStartRow()); - setEndRow(tSplit.getEndRow()); - setEndRowInclusive(tSplit.getEndRowInclusive()); - } - - break; - - case GET_LIST: { - LOG.debug(String.format("For split [%s] we have key list (%s)", - tSplit, tSplit.getKeyList())); - - setKeyList(tSplit.getKeyList()); - setVersions(tSplit.getVersions()); - } - - break; - - case EMPTY: - LOG.info("EMPTY split. Doing nothing."); - break; - - default: - throw new IOException("Unknown source mode : " - + tSplit.getSourceMode()); - } - - setSourceMode(tSplit.getSourceMode()); - - init(); - } - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - private void restartRangeScan(byte[] firstRow) throws IOException { - Scan currentScan; - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) - + ", endRow: " + Bytes.toString(endRow)); - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - TableInputFormat.addColumns(scan, trrInputColumns); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) - + ", no endRow"); - - Scan scan = new Scan(firstRow); - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - if (logScannerActivity) { - LOG.debug("Current scan=" + currentScan.toString()); - timestamp = System.currentTimeMillis(); - rowcount = 0; - } - } - - public TreeSet getKeyList() { - return keyList; - } - - private void setKeyList(TreeSet keyList) { - this.keyList = keyList; - } - - private void setVersions(int versions) { - this.versions = versions; - } - - public void setUseSalt(boolean useSalt) { - this.useSalt = useSalt; - } - - public SourceMode getSourceMode() { - return sourceMode; - } - - private void setSourceMode(SourceMode sourceMode) { - this.sourceMode = sourceMode; - } - - public byte[] getEndRow() { - return endRow; - } - - private void setEndRowInclusive(boolean isInclusive) { - endRowInclusive = isInclusive; - } - - public boolean getEndRowInclusive() { - return endRowInclusive; - } - - private byte[] nextKey = null; - private Vector> resultVector = null; - Map> keyValueMap = null; - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - private void init() throws IOException { - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - restartRangeScan(startRow); - break; - - case GET_LIST: - nextKey = Bytes.toBytes(keyList.pollFirst()); - break; - - case EMPTY: - LOG.info("EMPTY mode. Do nothing"); - break; - - default: - throw new IOException(" Unknown source mode : " + sourceMode); - } - } - - byte[] getStartRow() { - return this.startRow; - } - - /** - * @param htable - * the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean( - ScannerCallable.LOG_SCANNER_ACTIVITY, false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * @param inputColumns - * the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte[][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow - * the first row in the split - */ - private void setStartRow(final byte[] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow - * the last row in the split - */ - private void setEndRow(final byte[] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter - * the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - - @Override - public void close() { - if (this.scanner != null) - this.scanner.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - @Override - public Result createValue() { - return new Result(); - } - - @Override - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - @Override - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key - * HStoreKey as input key. - * @param value - * MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - @Override - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: { - - Result result; - try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) - + "ms to process " + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // try to handle all IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " - + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," - + " if your mapper has restarted a few other times like this" - + " then you should consider killing this job and investigate" - + " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restartRangeScan(startRow); - } else { - restartRangeScan(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row - } - result = this.scanner.next(); - } - - if (result != null && result.size() > 0) { - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return setNextSplit(); - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) - + "ms to process " + rowcount + " rows"); - LOG.debug(ioe); - String lastRow = lastSuccessfulRow == null ? "null" : Bytes - .toStringBinary(lastSuccessfulRow); - LOG.debug("lastSuccessfulRow=" + lastRow); - } - throw ioe; - } - } - - case GET_LIST: { - LOG.debug(String.format("INTO next with GET LIST and Key (%s)", - Bytes.toString(nextKey))); - - if (versions == 1) { - if (nextKey != null) { - LOG.debug(String.format("Processing Key (%s)", - Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result result = this.htable.get(theGet); - - if (result != null && (!result.isEmpty())) { - LOG.debug(String.format( - "Key (%s), Version (%s), Got Result (%s)", - Bytes.toString(nextKey), versions, result)); - - if (keyList != null || !keyList.isEmpty()) { - String newKey = keyList.pollFirst(); - LOG.debug("New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - } else { - nextKey = null; - } - - LOG.debug(String.format("=> Picked a new Key (%s)", - Bytes.toString(nextKey))); - - // Write the result - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - LOG.debug(" Key (" + Bytes.toString(nextKey) - + ") return an EMPTY result. Get (" + theGet.getId() - + ")"); // alg0 - - String newKey; - while ((newKey = keyList.pollFirst()) != null) { - LOG.debug("WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - if (nextKey == null) { - LOG.error("BOMB! BOMB! BOMB!"); - continue; - } - - if (!this.htable.exists(new Get(nextKey))) { - LOG.debug(String.format( - "Key (%s) Does not exist in Table (%s)", - Bytes.toString(nextKey), - Bytes.toString(this.htable.getTableName()))); - continue; - } else { - break; - } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - LOG.debug("Final New Key => " + Bytes.toString(nextKey)); - - return next(key, value); - } - } else { - // Nothig left. return false - return setNextSplit(); - } - } else { - if (resultVector != null && resultVector.size() != 0) { - LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", - versions, resultVector)); - - List resultKeyValue = resultVector - .remove(resultVector.size() - 1); - Result result = new Result(resultKeyValue); - - LOG.debug(String.format("+ Version (%s), Got Result <%s>", - versions, result)); - - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - if (nextKey != null) { - LOG.debug(String.format("+ Processing Key (%s)", - Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result resultAll = this.htable.get(theGet); - - if (resultAll != null && (!resultAll.isEmpty())) { - List keyValeList = resultAll.list(); - - keyValueMap = new HashMap>(); - - LOG.debug(String.format( - "+ Key (%s) Versions (%s) Val;ute map <%s>", - Bytes.toString(nextKey), versions, keyValueMap)); - - for (KeyValue keyValue : keyValeList) { - long version = keyValue.getTimestamp(); - - if (keyValueMap.containsKey(new Long(version))) { - List keyValueTempList = keyValueMap - .get(new Long(version)); - if (keyValueTempList == null) { - keyValueTempList = new ArrayList(); - } - keyValueTempList.add(keyValue); - } else { - List keyValueTempList = new ArrayList(); - keyValueMap.put(new Long(version), - keyValueTempList); - keyValueTempList.add(keyValue); - } - } - - resultVector = new Vector>(); - resultVector.addAll(keyValueMap.values()); - - List resultKeyValue = resultVector - .remove(resultVector.size() - 1); - - Result result = new Result(resultKeyValue); - - LOG.debug(String.format( - "+ Version (%s), Got Result (%s)", versions, - result)); - - String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - - System.out.println("+ New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - if (useSalt) { - key.set(HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } else { - LOG.debug(String.format( - "+ Key (%s) return an EMPTY result. Get (%s)", - Bytes.toString(nextKey), theGet.getId())); // alg0 - - String newKey; - - while ((newKey = keyList.pollFirst()) != null) { - LOG.debug("+ WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - if (nextKey == null) { - LOG.error("+ BOMB! BOMB! BOMB!"); - continue; - } - - if (!this.htable.exists(new Get(nextKey))) { - LOG.debug(String.format( - "+ Key (%s) Does not exist in Table (%s)", - Bytes.toString(nextKey), - Bytes.toString(this.htable.getTableName()))); - continue; - } else { - break; - } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null - : Bytes.toBytes(newKey); - - LOG.debug("+ Final New Key => " - + Bytes.toString(nextKey)); - - return next(key, value); - } - - } else { - return setNextSplit(); - } - } - } - } - - case EMPTY: { - LOG.info("GOT an empty Split"); - return setNextSplit(); - } - - default: - throw new IOException("Unknown source mode : " + sourceMode); - } - } -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java new file mode 100644 index 0000000..37858ad --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -0,0 +1,140 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.RecordReader; + +import java.util.TreeSet; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 15:42 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseRecordReaderBase implements + RecordReader { + + protected TreeSet keyList; + protected HBaseConstants.SourceMode sourceMode; + protected boolean endRowInclusive = true; + protected int versions = 1; + protected boolean useSalt = false; + + protected byte[] startRow; + protected byte[] endRow; + + protected HTable htable; + protected byte[][] trrInputColumns; + + protected Filter trrRowFilter; + + protected boolean logScannerActivity = false; + protected int logPerRowCount = 100; + + @Override + public String toString() { + StringBuffer sbuf = new StringBuffer(); + + sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] endRowInc [%s] ", + Bytes.toString(startRow), Bytes.toString(endRow), endRowInclusive)); + sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ", + sourceMode, useSalt, versions)); + + return sbuf.toString(); + } + + byte[] getStartRow() { + return this.startRow; + } + + /** + * @param htable + * the {@link org.apache.hadoop.hbase.client.HTable} to scan. + */ + public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, + false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * @param inputColumns + * the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte[][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow + * the first row in the split + */ + public void setStartRow(final byte[] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow + * the last row in the split + */ + public void setEndRow(final byte[] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter + * the {@link org.apache.hadoop.hbase.filter.Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + public TreeSet getKeyList() { + return keyList; + } + + public void setKeyList(TreeSet keyList) { + this.keyList = keyList; + } + + public void setVersions(int versions) { + this.versions = versions; + } + + public void setUseSalt(boolean useSalt) { + this.useSalt = useSalt; + } + + public HBaseConstants.SourceMode getSourceMode() { + return sourceMode; + } + + public void setSourceMode(HBaseConstants.SourceMode sourceMode) { + this.sourceMode = sourceMode; + } + + public byte[] getEndRow() { + return endRow; + } + + public void setEndRowInclusive(boolean isInclusive) { + endRowInclusive = isInclusive; + } + + public boolean getEndRowInclusive() { + return endRowInclusive; + } + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java new file mode 100644 index 0000000..6c28d9f --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -0,0 +1,420 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { + + static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); + + private byte[] lastSuccessfulRow; + private ResultScanner scanner; + private long timestamp; + private int rowcount; + + @Override + public String toString() { + StringBuffer sbuf = new StringBuffer(); + + sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", + Bytes.toString(startRow), Bytes.toString(endRow), Bytes.toString(lastSuccessfulRow), Bytes.toString(nextKey), endRowInclusive, rowcount)); + sbuf.append("".format(" sourceMode [%s] salt [%s] versions [%s] ", + sourceMode, useSalt, versions)); + + return sbuf.toString(); + } + + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restartRangeScan(byte[] firstRow) throws IOException { + Scan currentScan; + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + scan.setCacheBlocks(false); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } else { + LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) + + ", endRow: " + Bytes.toString(endRow)); + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + TableInputFormat.addColumns(scan, trrInputColumns); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + } else { + LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + + ", no endRow"); + + Scan scan = new Scan(firstRow); + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + if (logScannerActivity) { + LOG.debug("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } + } + + + private byte[] nextKey = null; + private Vector> resultVector = null; + Map> keyValueMap = null; + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + restartRangeScan(startRow); + break; + + case GET_LIST: + nextKey = Bytes.toBytes(keyList.pollFirst()); + break; + + default: + throw new IOException(" Unknown source mode : " + sourceMode); + } + } + + @Override + public void close() { + if (this.scanner != null) + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + @Override + public Result createValue() { + return new Result(); + } + + @Override + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + @Override + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key + * HStoreKey as input key. + * @param value + * MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + @Override + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: { + + Result result; + try { + try { + result = this.scanner.next(); + if (logScannerActivity) { + rowcount++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restartRangeScan(startRow); + } else { + restartRangeScan(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + "ms to process " + + rowcount + " rows"); + LOG.debug(ioe); + String lastRow = lastSuccessfulRow == null ? "null" : Bytes + .toStringBinary(lastSuccessfulRow); + LOG.debug("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } + } + + case GET_LIST: { + LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); + + if (versions == 1) { + if (nextKey != null) { + LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result result = this.htable.get(theGet); + + if (result != null && (! result.isEmpty()) ) { + LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); + + if (keyList != null || !keyList.isEmpty()) { + String newKey = keyList.pollFirst(); + LOG.debug("New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + } else { + nextKey = null; + } + + LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); + + // Write the result + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 + + String newKey; + while((newKey = keyList.pollFirst()) != null) { + LOG.debug("WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + } else { + // Nothig left. return false + return false; + } + } else { + if (resultVector != null && resultVector.size() != 0) { + LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); + + List resultKeyValue = resultVector.remove(resultVector.size() - 1); + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + if (nextKey != null) { + LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result resultAll = this.htable.get(theGet); + + if( resultAll != null && (! resultAll.isEmpty())) { + List keyValeList = resultAll.list(); + + keyValueMap = new HashMap>(); + + LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); + + for (KeyValue keyValue : keyValeList) { + long version = keyValue.getTimestamp(); + + if (keyValueMap.containsKey(new Long(version))) { + List keyValueTempList = keyValueMap.get(new Long( + version)); + if (keyValueTempList == null) { + keyValueTempList = new ArrayList(); + } + keyValueTempList.add(keyValue); + } else { + List keyValueTempList = new ArrayList(); + keyValueMap.put(new Long(version), keyValueTempList); + keyValueTempList.add(keyValue); + } + } + + resultVector = new Vector>(); + resultVector.addAll(keyValueMap.values()); + + List resultKeyValue = resultVector.remove(resultVector.size() - 1); + + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); + + String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( useSalt) { + key.set( HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 + + String newKey; + + while( (newKey = keyList.pollFirst()) != null ) { + LOG.debug("+ WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + if( nextKey == null ) { + LOG.error("+ BOMB! BOMB! BOMB!"); + continue; + } + + if( ! this.htable.exists( new Get(nextKey) ) ) { + LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); + continue; + } else { break; } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes + .toBytes(newKey); + + LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + + } else { + return false; + } + } + } + } + default: + throw new IOException("Unknown source mode : " + sourceMode); + } + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java new file mode 100644 index 0000000..e2b1ec8 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -0,0 +1,124 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { + + static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class); + + + private byte[] nextKey = null; + private Vector> resultVector = null; + Map> keyValueMap = null; + + private HBaseTableSplitRegional multiSplit = null; + private HBaseTableSplitGranular currentSplit = null; + + private HBaseRecordReaderGranular currentRecordReader = null; + + public void init(HBaseTableSplitRegional mSplit) throws IOException { + multiSplit = mSplit; + + LOG.debug("Creating Multi Split for region location : " + + multiSplit.getRegionLocation() + " -> " + multiSplit); + + setNextSplit(); + } + + public boolean setNextSplit() throws IOException { + currentSplit = multiSplit.getNextSplit(); + + LOG.debug("IN: setNextSplit : " + currentSplit ); + + if( currentSplit != null ) { + setSplitValue(currentSplit); + return true; + } else { + return false; + } + } + + private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException { + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(htable); + trr.setInputColumns(trrInputColumns); + trr.setRowFilter(trrRowFilter); + + trr.init(); + } + + private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException { + LOG.debug("IN: setSplitValue : " + tSplit ); + + if( currentRecordReader != null ) currentRecordReader.close(); + + currentRecordReader = new HBaseRecordReaderGranular(); + setRecordReaderParms(currentRecordReader, currentSplit); + } + + @Override + public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException { + boolean nextFlag = currentRecordReader.next(ibw, result); + + while(nextFlag == false && multiSplit.hasMoreSplits() ) { + setNextSplit(); + nextFlag = currentRecordReader.next(ibw, result); + } + + return nextFlag; + } + + @Override + public ImmutableBytesWritable createKey() { + return currentRecordReader.createKey(); + } + + @Override + public Result createValue() { + return currentRecordReader.createValue(); + } + + @Override + public long getPos() throws IOException { + return currentRecordReader.getPos(); + } + + @Override + public void close() throws IOException { + currentRecordReader.close(); + } + + @Override + public float getProgress() throws IOException { + return currentRecordReader.getProgress(); + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java deleted file mode 100644 index 5eafc78..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader_SINGLE.java +++ /dev/null @@ -1,505 +0,0 @@ -package parallelai.spyglass.hbase; - -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.Vector; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseRecordReader_SINGLE implements - RecordReader { - - static final Log LOG = LogFactory.getLog(HBaseRecordReader_SINGLE.class); - - private byte[] startRow; - private byte[] endRow; - private byte[] lastSuccessfulRow; - private TreeSet keyList; - private SourceMode sourceMode; - private Filter trrRowFilter; - private ResultScanner scanner; - private HTable htable; - private byte[][] trrInputColumns; - private long timestamp; - private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - private boolean endRowInclusive = true; - private int versions = 1; - private boolean useSalt = false; - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - public void restartRangeScan(byte[] firstRow) throws IOException { - Scan currentScan; - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) - + ", endRow: " + Bytes.toString(endRow)); - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - TableInputFormat.addColumns(scan, trrInputColumns); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) - + ", no endRow"); - - Scan scan = new Scan(firstRow); - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - if (logScannerActivity) { - LOG.debug("Current scan=" + currentScan.toString()); - timestamp = System.currentTimeMillis(); - rowcount = 0; - } - } - - public TreeSet getKeyList() { - return keyList; - } - - public void setKeyList(TreeSet keyList) { - this.keyList = keyList; - } - - public void setVersions(int versions) { - this.versions = versions; - } - - public void setUseSalt(boolean useSalt) { - this.useSalt = useSalt; - } - - public SourceMode getSourceMode() { - return sourceMode; - } - - public void setSourceMode(SourceMode sourceMode) { - this.sourceMode = sourceMode; - } - - public byte[] getEndRow() { - return endRow; - } - - public void setEndRowInclusive(boolean isInclusive) { - endRowInclusive = isInclusive; - } - - public boolean getEndRowInclusive() { - return endRowInclusive; - } - - private byte[] nextKey = null; - private Vector> resultVector = null; - Map> keyValueMap = null; - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - public void init() throws IOException { - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - restartRangeScan(startRow); - break; - - case GET_LIST: - nextKey = Bytes.toBytes(keyList.pollFirst()); - break; - - default: - throw new IOException(" Unknown source mode : " + sourceMode); - } - } - - byte[] getStartRow() { - return this.startRow; - } - - /** - * @param htable - * the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, - false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * @param inputColumns - * the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte[][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow - * the first row in the split - */ - public void setStartRow(final byte[] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow - * the last row in the split - */ - public void setEndRow(final byte[] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter - * the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - - @Override - public void close() { - if (this.scanner != null) - this.scanner.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - @Override - public Result createValue() { - return new Result(); - } - - @Override - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - @Override - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key - * HStoreKey as input key. - * @param value - * MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - @Override - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: { - - Result result; - try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // try to handle all IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," - + " if your mapper has restarted a few other times like this" - + " then you should consider killing this job and investigate" - + " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restartRangeScan(startRow); - } else { - restartRangeScan(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row - } - result = this.scanner.next(); - } - - if (result != null && result.size() > 0) { - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return false; - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - LOG.debug(ioe); - String lastRow = lastSuccessfulRow == null ? "null" : Bytes - .toStringBinary(lastSuccessfulRow); - LOG.debug("lastSuccessfulRow=" + lastRow); - } - throw ioe; - } - } - - case GET_LIST: { - LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); - - if (versions == 1) { - if (nextKey != null) { - LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result result = this.htable.get(theGet); - - if (result != null && (! result.isEmpty()) ) { - LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); - - if (keyList != null || !keyList.isEmpty()) { - String newKey = keyList.pollFirst(); - LOG.debug("New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - } else { - nextKey = null; - } - - LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); - - // Write the result - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 - - String newKey; - while((newKey = keyList.pollFirst()) != null) { - LOG.debug("WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - if( nextKey == null ) { - LOG.error("BOMB! BOMB! BOMB!"); - continue; - } - - if( ! this.htable.exists( new Get(nextKey) ) ) { - LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); - continue; - } else { break; } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - LOG.debug("Final New Key => " + Bytes.toString(nextKey)); - - return next(key, value); - } - } else { - // Nothig left. return false - return false; - } - } else { - if (resultVector != null && resultVector.size() != 0) { - LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); - - List resultKeyValue = resultVector.remove(resultVector.size() - 1); - Result result = new Result(resultKeyValue); - - LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); - - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - if (nextKey != null) { - LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result resultAll = this.htable.get(theGet); - - if( resultAll != null && (! resultAll.isEmpty())) { - List keyValeList = resultAll.list(); - - keyValueMap = new HashMap>(); - - LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); - - for (KeyValue keyValue : keyValeList) { - long version = keyValue.getTimestamp(); - - if (keyValueMap.containsKey(new Long(version))) { - List keyValueTempList = keyValueMap.get(new Long( - version)); - if (keyValueTempList == null) { - keyValueTempList = new ArrayList(); - } - keyValueTempList.add(keyValue); - } else { - List keyValueTempList = new ArrayList(); - keyValueMap.put(new Long(version), keyValueTempList); - keyValueTempList.add(keyValue); - } - } - - resultVector = new Vector>(); - resultVector.addAll(keyValueMap.values()); - - List resultKeyValue = resultVector.remove(resultVector.size() - 1); - - Result result = new Result(resultKeyValue); - - LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); - - String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - - System.out.println("+ New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } else { - LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 - - String newKey; - - while( (newKey = keyList.pollFirst()) != null ) { - LOG.debug("+ WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - if( nextKey == null ) { - LOG.error("+ BOMB! BOMB! BOMB!"); - continue; - } - - if( ! this.htable.exists( new Get(nextKey) ) ) { - LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); - continue; - } else { break; } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); - - return next(key, value); - } - - } else { - return false; - } - } - } - } - default: - throw new IOException("Unknown source mode : " + sourceMode); - } - } -} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 6766458..5bdf8cd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -68,7 +68,7 @@ public class HBaseSalter { byte[] originalStartKey, byte[] originalStopKey, byte[] regionStartKey, byte[] regionStopKey, String prefixList) throws IOException { - LOG.info("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", + LOG.debug("".format("OSRT: (%s) OSTP: (%s) RSRT: (%s) RSTP: (%s) PRFX: (%s)", Bytes.toString(originalStartKey), Bytes.toString(originalStopKey), Bytes.toString(regionStartKey), @@ -160,7 +160,7 @@ public class HBaseSalter { } private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { - LOG.info("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)", + LOG.debug("".format("getAllKeysWithStartStop: OKEY (%s) PLIST (%s) PSRT (%s) PSTP (%s)", Bytes.toString(originalKey), prefixList, startPrefix, stopPrefix)); char[] prefixArray = prefixList.toCharArray(); @@ -172,13 +172,13 @@ public class HBaseSalter { SortedSet subSet = prefixSet.subSet(startPrefix, true, stopPrefix, true); - LOG.info("".format("Prefix subset (%s)", subSet)); + LOG.debug("".format("Prefix subset (%s)", subSet)); return getAllKeys(originalKey, subSet.toArray(new Byte[]{})); } public static byte[][] getAllKeys(byte[] originalKey, Byte [] prefixArray) { - LOG.info("".format("getAllKeys: OKEY (%s) PARRAY (%s)", + LOG.debug("".format("getAllKeys: OKEY (%s) PARRAY (%s)", Bytes.toString(originalKey), prefixArray )); byte[][] keys = new byte[prefixArray.length][]; @@ -187,12 +187,6 @@ public class HBaseSalter { keys[i] = Bytes.add(new byte[] {prefixArray[i].byteValue()}, Bytes.add( Bytes.toBytes("_"), originalKey)); } - for(int i = 0; i < keys.length; i ++) { - for(int j = 0; j < keys[i].length; j++) { - LOG.info("" + i + " : " + j + " : " + keys[i][j]); - } - } - return keys; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index aa446c1..6f04f01 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -12,6 +12,7 @@ package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; import cascading.flow.FlowProcess; import cascading.scheme.Scheme; import cascading.scheme.SinkCall; @@ -65,6 +66,8 @@ public class HBaseScheme // private transient byte[][] fields; private boolean useSalt = false; + + private SplitType splitType = SplitType.GRANULAR; /** @@ -279,11 +282,31 @@ public class HBaseScheme @Override public void sourceConfInit(FlowProcess process, Tap tap, JobConf conf) { - conf.setInputFormat(HBaseInputFormat.class); - String columns = getColumns(); - LOG.debug("sourcing from columns: {}", columns); - conf.set(HBaseInputFormat.COLUMN_LIST, columns); + switch(splitType) { + case GRANULAR: + { + conf.setInputFormat(HBaseInputFormatGranular.class); + + String columns = getColumns(); + LOG.debug("sourcing from columns: {}", columns); + conf.set(HBaseInputFormatGranular.COLUMN_LIST, columns); + } + break; + + case REGIONAL: + { + conf.setInputFormat(HBaseInputFormatRegional.class); + + String columns = getColumns(); + LOG.debug("sourcing from columns: {}", columns); + conf.set(HBaseInputFormatRegional.COLUMN_LIST, columns); + } + break; + + default: + LOG.error("Unknown Split Type : " + splitType.toString()); + } } private String getColumns() { @@ -345,4 +368,8 @@ public class HBaseScheme result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0); return result; } + + public void setInputSplitTye(SplitType sType) { + this.splitType = sType; + } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java deleted file mode 100644 index 87b8f58..0000000 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ /dev/null @@ -1,219 +0,0 @@ -package parallelai.spyglass.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; - -public class HBaseTableSplit implements InputSplit, - Comparable, Serializable { - - private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); - - private byte[] m_tableName = null; - private byte[] m_startRow = null; - private byte[] m_endRow = null; - private String m_regionLocation = null; - private TreeSet m_keyList = null; - private SourceMode m_sourceMode = SourceMode.EMPTY; - private boolean m_endRowInclusive = true; - private int m_versions = 1; - private boolean m_useSalt = false; - - /** default constructor */ - public HBaseTableSplit() { - this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); - } - - /** - * Constructor - * - * @param tableName - * @param startRow - * @param endRow - * @param location - */ - public HBaseTableSplit(final byte[] tableName, final byte[] startRow, - final byte[] endRow, final String location, - final SourceMode sourceMode, final boolean useSalt) { - this.m_tableName = tableName; - this.m_startRow = startRow; - this.m_endRow = endRow; - this.m_regionLocation = location; - this.m_sourceMode = sourceMode; - this.m_useSalt = useSalt; - } - - public HBaseTableSplit(final byte[] tableName, - final TreeSet keyList, int versions, final String location, - final SourceMode sourceMode, final boolean useSalt) { - this.m_tableName = tableName; - this.m_keyList = keyList; - this.m_versions = versions; - this.m_sourceMode = sourceMode; - this.m_regionLocation = location; - this.m_useSalt = useSalt; - } - - /** @return table name */ - public byte[] getTableName() { - return this.m_tableName; - } - - /** @return starting row key */ - public byte[] getStartRow() { - return this.m_startRow; - } - - /** @return end row key */ - public byte[] getEndRow() { - return this.m_endRow; - } - - public boolean getEndRowInclusive() { - return m_endRowInclusive; - } - - public void setEndRowInclusive(boolean isInclusive) { - m_endRowInclusive = isInclusive; - } - - /** @return list of keys to get */ - public TreeSet getKeyList() { - return m_keyList; - } - - public int getVersions() { - return m_versions; - } - - /** @return get the source mode */ - public SourceMode getSourceMode() { - return m_sourceMode; - } - - public boolean getUseSalt() { - return m_useSalt; - } - - /** @return the region's hostname */ - public String getRegionLocation() { - LOG.debug("REGION GETTER : " + m_regionLocation); - - return this.m_regionLocation; - } - - public String[] getLocations() { - LOG.debug("REGION ARRAY : " + m_regionLocation); - - return new String[] { this.m_regionLocation }; - } - - @Override - public long getLength() { - // Not clear how to obtain this... seems to be used only for sorting - // splits - return 0; - } - - @Override - public void readFields(DataInput in) throws IOException { - LOG.debug("READ ME : " + in.toString()); - - this.m_tableName = Bytes.readByteArray(in); - this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); - this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes - .readByteArray(in))); - this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); - - switch (this.m_sourceMode) { - case SCAN_RANGE: - this.m_startRow = Bytes.readByteArray(in); - this.m_endRow = Bytes.readByteArray(in); - this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); - break; - - case GET_LIST: - this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); - this.m_keyList = new TreeSet(); - - int m = Bytes.toInt(Bytes.readByteArray(in)); - - for (int i = 0; i < m; i++) { - this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); - } - break; - } - - LOG.debug("READ and CREATED : " + this); - } - - @Override - public void write(DataOutput out) throws IOException { - LOG.debug("WRITE : " + this); - - Bytes.writeByteArray(out, this.m_tableName); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); - - switch (this.m_sourceMode) { - case SCAN_RANGE: - Bytes.writeByteArray(out, this.m_startRow); - Bytes.writeByteArray(out, this.m_endRow); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); - break; - - case GET_LIST: - Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); - - for (String k : this.m_keyList) { - Bytes.writeByteArray(out, Bytes.toBytes(k)); - } - break; - } - - LOG.debug("WROTE : " + out.toString()); - } - - @Override - public String toString() { - return String - .format( - "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", - Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, - Bytes.toString(m_startRow), Bytes.toString(m_endRow), - (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, - m_useSalt); - } - - @Override - public int compareTo(HBaseTableSplit o) { - switch (m_sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - return Bytes.compareTo(getStartRow(), o.getStartRow()); - - case GET_LIST: - return m_keyList.equals(o.getKeyList()) ? 0 : -1; - - case EMPTY: - return 0; - - default: - return -1; - } - - } -} \ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java new file mode 100644 index 0000000..2f6e7b5 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -0,0 +1,165 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.TreeSet; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 16:18 + * To change this template use File | Settings | File Templates. + */ +public abstract class HBaseTableSplitBase implements InputSplit, + Comparable, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseTableSplitBase.class); + + + protected byte[] m_tableName = null; + protected byte[] m_startRow = null; + protected byte[] m_endRow = null; + protected String m_regionLocation = null; + protected TreeSet m_keyList = null; + protected HBaseConstants.SourceMode m_sourceMode = HBaseConstants.SourceMode.EMPTY; + protected boolean m_endRowInclusive = true; + protected int m_versions = 1; + protected boolean m_useSalt = false; + + + /** @return table name */ + public byte[] getTableName() { + return this.m_tableName; + } + + /** @return starting row key */ + public byte[] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte[] getEndRow() { + return this.m_endRow; + } + + public boolean getEndRowInclusive() { + return m_endRowInclusive; + } + + public void setEndRowInclusive(boolean isInclusive) { + m_endRowInclusive = isInclusive; + } + + /** @return list of keys to get */ + public TreeSet getKeyList() { + return m_keyList; + } + + public int getVersions() { + return m_versions; + } + + /** @return get the source mode */ + public HBaseConstants.SourceMode getSourceMode() { + return m_sourceMode; + } + + public boolean getUseSalt() { + return m_useSalt; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.debug("REGION GETTER : " + m_regionLocation); + + return this.m_regionLocation; + } + + public String[] getLocations() { + LOG.debug("REGION ARRAY : " + m_regionLocation); + + return new String[] { this.m_regionLocation }; + } + + + public void copy(HBaseTableSplitBase that) { + this.m_endRow = that.m_endRow; + this.m_endRowInclusive = that.m_endRowInclusive; + this.m_keyList = that.m_keyList; + this.m_sourceMode = that.m_sourceMode; + this.m_startRow = that.m_startRow; + this.m_tableName = that.m_tableName; + this.m_useSalt = that.m_useSalt; + this.m_versions = that.m_versions; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("READ ME : " + in.toString()); + + this.m_tableName = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_sourceMode = HBaseConstants.SourceMode.valueOf(Bytes.toString(Bytes + .readByteArray(in))); + this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); + + switch (this.m_sourceMode) { + case SCAN_RANGE: + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); + break; + + case GET_LIST: + this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); + this.m_keyList = new TreeSet(); + + int m = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < m; i++) { + this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); + } + break; + } + + LOG.debug("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("WRITE : " + this); + + Bytes.writeByteArray(out, this.m_tableName); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); + + switch (this.m_sourceMode) { + case SCAN_RANGE: + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); + break; + + case GET_LIST: + Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + + for (String k : this.m_keyList) { + Bytes.writeByteArray(out, Bytes.toBytes(k)); + } + break; + } + + LOG.debug("WROTE : " + out.toString()); + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java new file mode 100644 index 0000000..4de7153 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -0,0 +1,97 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseTableSplitGranular extends HBaseTableSplitBase { + + private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); + + /** default constructor */ + public HBaseTableSplitGranular() { + this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, "", HBaseConstants.SourceMode.EMPTY, false); + } + + /** + * Constructor + * + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public HBaseTableSplitGranular(final byte[] tableName, final byte[] startRow, + final byte[] endRow, final String location, + final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + this.m_sourceMode = sourceMode; + this.m_useSalt = useSalt; + } + + public HBaseTableSplitGranular(final byte[] tableName, + final TreeSet keyList, int versions, final String location, + final HBaseConstants.SourceMode sourceMode, final boolean useSalt) { + this.m_tableName = tableName; + this.m_keyList = keyList; + this.m_versions = versions; + this.m_sourceMode = sourceMode; + this.m_regionLocation = location; + this.m_useSalt = useSalt; + } + + + @Override + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting + // splits + return 0; + } + + + @Override + public String toString() { + return String + .format( + "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, + Bytes.toString(m_startRow), Bytes.toString(m_endRow), + (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, + m_useSalt); + } + + @Override + public int compareTo(HBaseTableSplitBase o) { + if( ! (o instanceof HBaseTableSplitGranular) ) return -1; + + switch (m_sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + return Bytes.compareTo(getStartRow(), o.getStartRow()); + + case GET_LIST: + return m_keyList.equals(o.getKeyList()) ? 0 : -1; + + case EMPTY: + return 0; + + default: + return -1; + } + + } +} \ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java new file mode 100644 index 0000000..1ebfa3d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -0,0 +1,127 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Vector; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +public class HBaseTableSplitRegional extends HBaseTableSplitBase { + + private final Log LOG = LogFactory.getLog(HBaseTableSplitRegional.class); + + private List splits = new Vector(); + + /** default constructor */ + private HBaseTableSplitRegional() { + + } + + public HBaseTableSplitRegional(String regionLocation) { + this.m_regionLocation = regionLocation; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("REGIONAL READ ME : " + in.toString()); + + super.readFields(in); + + int s = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < s; i++) { + HBaseTableSplitGranular hbts = new HBaseTableSplitGranular(); + hbts.readFields(in); + + splits.add(hbts); + } + + LOG.debug("REGIONAL READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("REGIONAL WRITE : " + this); + + super.write(out); + + Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); + + for (HBaseTableSplitGranular hbts : splits) { + hbts.write(out); + } + + LOG.debug("REGIONAL WROTE : " + out.toString()); + } + + @Override + public String toString() { + StringBuffer str = new StringBuffer(); + str.append("HBaseTableSplitRegional : "); + + str.append(super.toString()); + + str.append(" GRANULAR = > "); + + for (HBaseTableSplitGranular hbt : splits) { + str.append(" [" + hbt.toString() + "]"); + } + + return str.toString(); + } + + @Override + public int compareTo(HBaseTableSplitBase o) { + if( ! (o instanceof HBaseTableSplitRegional) ) return -1; + + return (splits.size() - ((HBaseTableSplitRegional)o).splits.size()); + } + + @Override + public long getLength() throws IOException { + return splits.size(); + } + + public void addSplit(HBaseTableSplitGranular hbt) throws IOException { + LOG.debug("ADD Split : " + hbt); + + if (hbt.getRegionLocation().equals(m_regionLocation)) { + splits.add(hbt); + this.copy(hbt); + } else + throw new IOException("HBaseTableSplitGranular Region Location " + + hbt.getRegionLocation() + + " does NOT match MultiSplit Region Location " + m_regionLocation); + } + +// public List getSplits() { +// return splits; +// } + + public boolean hasMoreSplits() { + splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; + + return splitIterator.hasNext(); + } + + private Iterator splitIterator = null; + + public HBaseTableSplitGranular getNextSplit() { + splitIterator = (splitIterator == null) ? splits.listIterator() : splitIterator; + + if( splitIterator.hasNext() ) { + return splitIterator.next(); + } else { + return null; + } + } +} \ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 07b5aa7..bfe6670 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,8 @@ package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; + import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.flow.FlowProcess; import cascading.tap.SinkMode; @@ -63,6 +65,8 @@ public class HBaseTap extends Tap { /** Field tableName */ private String tableName; + private SplitType splitType = SplitType.GRANULAR; + /** * Constructor HBaseTap creates a new HBaseTap instance. * @@ -204,7 +208,7 @@ public class HBaseTap extends Tap { return true; } - LOG.info("creating hbase table: {}", tableName); + LOG.info("Creating HBase Table: {}", tableName); HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); @@ -256,8 +260,19 @@ public class HBaseTap extends Tap { // process.getID(); // // super.getFullIdentifier(conf); - - HBaseInputFormat.setTableName(conf, tableName); + + switch(splitType) { + case GRANULAR: + HBaseInputFormatGranular.setTableName(conf, tableName); + break; + + case REGIONAL: + HBaseInputFormatRegional.setTableName(conf, tableName); + break; + + default: + LOG.error("Unknown Split Type : " + splitType); + } for( SourceConfig sc : sourceConfigList) { sc.configure(conf); @@ -266,6 +281,10 @@ public class HBaseTap extends Tap { super.sourceConfInit(process, conf); } + public void setInputSplitType(SplitType sType) { + this.splitType = sType; + } + @Override public boolean equals(Object object) { if (this == object) { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 7ff7860..c214e99 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -10,7 +10,7 @@ import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} import cascading.scheme.{NullScheme, Scheme} import cascading.tap.SinkMode import cascading.tap.Tap @@ -40,11 +40,14 @@ case class HBaseSource( versions: Int = 1, useSalt: Boolean = false, prefixList: String = null, - sinkMode: SinkMode = SinkMode.UPDATE + sinkMode: SinkMode = SinkMode.UPDATE, + inputSplitType: SplitType = SplitType.GRANULAR ) extends Source { - - override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) + internalScheme.setInputSplitTye(inputSplitType) + + override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) @@ -76,6 +79,8 @@ case class HBaseSource( } case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) } + + hbt.setInputSplitType(inputSplitType) hbt.asInstanceOf[Tap[_,_,_]] } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index a4e2d7a..d75ff7b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -1,9 +1,9 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} -import com.twitter.scalding.{IterableSource, Args, TextLine} +import com.twitter.scalding.{Tsv, IterableSource, Args, TextLine} import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} import cascading.tuple.Fields import org.apache.log4j.{Logger, Level} @@ -59,76 +59,221 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val quorum = args("quorum") val sttKey = "01728" - val stpKey = "01831" + val stpKey = "03725" val sttKeyP = "8_01728" - val stpKeyP = "1_01831" + val stpKeyP = "5_03725" val listKey = List("01681", "01456") - val listKeyP = List("1_01681", "6_01456") - -// val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_ALL ).read -// .fromBytesWritable( TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanAllNoSalt01")) - + val listKeyP = List("0_01681", "6_01456") + val noSttKey = "9999990" + val noStpKey = "9999999" + val noSttKeyP = "9_9999990" + val noStpKeyP = "9_9999999" + val noListKey = List("0123456", "6543210") + val noListKeyP = List("6_0123456", "0_6543210") + + val splitType = if(args.getOrElse("regional", "true").toBoolean) SplitType.REGIONAL else SplitType.GRANULAR + + val testName01 = "Scan All with NO useSalt" + val list01 = (00000 to 99999).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_ALL, + inputSplitType = splitType ).read + .fromBytesWritable( TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanAllNoSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName02 = "Scan All with useSalt=true" val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data"), TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), - sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read -// .fromBytesWritable( TABLE_SCHEMA ) - .write(TextLine("saltTesting/ScanAllPlusSalt01")) - -// val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP ).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangeNoSalt01")) -// -// val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true ).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangePlusSalt01")) -// -// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.GET_LIST, keyList = listKeyP ).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/GetListNoSalt01")) -// -// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true).read -// .fromBytesWritable(TABLE_SCHEMA ) -// .write(TextLine("saltTesting/GetListPlusSalt01")) -// -// val hbase07 = -// new HBaseSource( "_TEST.SALT.03", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -// .read -// .fromBytesWritable( TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangePlusSalt10")) -// .toBytesWritable( TABLE_SCHEMA ) -// .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// useSalt = true )) -// -// val hbase08 = -// new HBaseSource( "_TEST.SALT.01", quorum, 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -// sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix ) -// .read -// .fromBytesWritable('*) -// .write(TextLine("saltTesting/ScanRangePlusSalt03")) + sourceMode = SourceMode.SCAN_ALL, useSalt = true, + inputSplitType = splitType).read + .fromBytesWritable( TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanAllPlusSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName03 = "Scan Range with NO useSalt" + val list03 = (sttKey.toInt to stpKey.toInt).toList.map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = sttKey, stopKey = stpKey, useSalt = true, prefixList = prefix, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangePlusSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName04 = "Scan Range with useSalt=true" + val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangeNoSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + val testName05 = "Get List with NO useSalt" + val list05 = listKey.map(x => x.toInt).map(x => ("" + (x%10) + "_" + "%05d".format(x), "" + (x%10) + "_" + "%05d".format(x), "%05d".format(x))) + val hbase05 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = listKey, useSalt = true, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListPlusSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName06 = "Get List with useSalt=true" + val hbase06 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = listKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListNoSalt01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName08 = "Scan Range NO RESULTS" + val hbase08 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKey, stopKey = noStpKey, useSalt = true, prefixList = prefix, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangePlusSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName09 = "Scan Range NO RESULT with useSalt=true" + val hbase09 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.SCAN_RANGE, startKey = noSttKeyP, stopKey = noStpKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/ScanRangeNoSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + val testName10 = "Get List NO RESULT" + val hbase10 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = noListKey, useSalt = true, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListPlusSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + val testName11 = "Get List NO RESULT with useSalt=true" + val hbase11 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), + sourceMode = SourceMode.GET_LIST, keyList = noListKeyP, + inputSplitType = splitType).read + .fromBytesWritable(TABLE_SCHEMA ) + .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('testData) + .write(TextLine("saltTesting/GetListNoSaltNoRes01")) + .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData)) + + + +// ( +//// getTestResultPipe(getExpectedPipe(list01), hbase01, testName01) ++ +//// getTestResultPipe(getExpectedPipe(list01), hbase02, testName02) ++ +// getTestResultPipe(getExpectedPipe(list03), hbase03, testName03) ++ +// getTestResultPipe(getExpectedPipe(list03), hbase04, testName03) ++ +// getTestResultPipe(getExpectedPipe(list05), hbase05, testName05) ++ +// getTestResultPipe(getExpectedPipe(list05), hbase06, testName06) ++ +// assertPipeIsEmpty(hbase08, testName08) ++ +// assertPipeIsEmpty(hbase09, testName09) ++ +// assertPipeIsEmpty(hbase10, testName10) ++ +// assertPipeIsEmpty(hbase11, testName11) +// ).groupAll { group => +// group.sortBy('testName) +// } +// .write(Tsv("saltTesting/FinalTestResults")) + + /** + * We assume the pipe is empty + * + * We concatenate with a header - if the resulting size is 1 + * then the original size was 0 - then the pipe was empty :) + * + * The result is then returned in a Pipe + */ + def assertPipeIsEmpty ( hbasePipe : Pipe , testName:String) : Pipe = { + val headerPipe = IterableSource(List(testName), 'testData) + val concatenation = ( hbasePipe ++ headerPipe ).groupAll{ group => + group.size('size) + } + .project('size) + + val result = + concatenation + .mapTo('size -> ('testName, 'result, 'expectedData, 'testData)) { x:String => { + if (x == "1") { + (testName, "Success", "", "") + } else { + (testName, "Test Failed", "", "") + } + } + } + + result + } + + /** + * Methods receives 2 pipes - and projects the results of testing + * + * expectedPipe should have a column 'expecteddata + * realHBasePipe should have a column 'hbasedata + */ + def getTestResultPipe ( expectedPipe:Pipe , realHBasePipe:Pipe, testName: String ): Pipe = { + val results = expectedPipe.insert('testName , testName) + .joinWithTiny('testName -> 'testName, realHBasePipe.insert('testName , testName)) + .map(('expectedData, 'testData)->'result) { x:(String,String) => + if (x._1.equals(x._2)) + "Success" + else + "Test Failed" + } + .project('testName, 'result, 'expectedData, 'testData) + results + } + + /** + * + */ + def getExpectedPipe ( expectedList: List[(String,String, String)]) : Pipe = { + IterableSource(expectedList, TABLE_SCHEMA) + .map(('key, 'salted, 'unsalted) -> 'expectedData) {x: (String, String, String) => List(x._1, x._2, x._3)} + .project('expectedData) + .groupAll(group => group.toList[List[List[String]]]('expectedData -> 'expectedData)) + } + } class HBaseSaltTestShutdown (args: Args) extends JobBase(args) with HBasePipeConversions { diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index a8de7d6..17bc873 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobRunner import com.twitter.scalding.Args +import org.apache.log4j.{Level, Logger} object HBaseSaltTesterRunner extends App { @@ -25,12 +26,18 @@ object HBaseSaltTesterRunner extends App { val test = mArgs.getOrElse("test.data", "false").toBoolean val delete = mArgs.getOrElse("delete.data", "false").toBoolean + val isDebug = mArgs.getOrElse("debug", "false").toBoolean + + if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) } + + if( make ) { JobRunner.main(Array(classOf[HBaseSaltTestSetup].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum + "--quorum", quorum, + "--debug", isDebug.toString )) } @@ -39,7 +46,9 @@ object HBaseSaltTesterRunner extends App { "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum + "--quorum", quorum, + "--debug", isDebug.toString, + "--regional", mArgs.getOrElse("regional", "false") )) } @@ -48,7 +57,8 @@ object HBaseSaltTesterRunner extends App { "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, - "--quorum", quorum + "--quorum", quorum, + "--debug", isDebug.toString )) } } \ No newline at end of file -- cgit v1.2.3