diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-08-07 11:29:57 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-08-07 11:29:57 +0100 |
commit | 2af55565c99297ddbce5ec9bb29a8e650a2a968f (patch) | |
tree | e17b9fccf8ecc72e4a84810869992440dc9c16db | |
parent | 6a2492a8a0a9a977664e7bfe3e0eb048de1a74b4 (diff) | |
parent | de38789a032b586b0e278a81dedb5c8fb6d43e02 (diff) | |
download | SpyGlass-2af55565c99297ddbce5ec9bb29a8e650a2a968f.tar.gz SpyGlass-2af55565c99297ddbce5ec9bb29a8e650a2a968f.zip |
Merged changes from koertkuipersSCALA_2.9.3_2.4.0
-rw-r--r-- | LICENSE | 177 | ||||
-rw-r--r-- | NOTICE | 18 | ||||
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java | 1123 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java | 111 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java | 1048 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseScheme.java | 8 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java | 392 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala | 28 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala | 152 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala | 63 |
11 files changed, 1819 insertions, 1303 deletions
@@ -0,0 +1,177 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + @@ -0,0 +1,18 @@ +SpyGlass is a Scalding Source and Cascading Tap framework +Copyright 2013 Parallel AI LTD + +Third Party Dependencies: + +Scalding 2.x +Apache Public License 2.0 +https://github.com/twitter/scalding + +Cascading 2.0 +Apache Public License 2.0 +http://www.cascading.org + +Hadoop 0.20.2 +Apache Public License 2.0 +http://hadoop.apache.org + + @@ -62,7 +62,7 @@ <name>Cascading and Scalding wrapper for HBase with advanced features</name> <groupId>parallelai</groupId> <artifactId>parallelai.spyglass</artifactId> - <version>${scala.version}_2.3.0</version> + <version>${scala.version}_2.4.0</version> <packaging>jar</packaging> <distributionManagement> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java index aabdc5e..8e121bc 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -6,7 +6,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -17,10 +16,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.filter.Filter; @@ -40,519 +37,609 @@ import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseInputFormat - implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { - - private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); - - private final String id = UUID.randomUUID().toString(); - - private byte [][] inputColumns; - private HTable table; -// private HBaseRecordReader tableRecordReader; - private Filter rowFilter; -// private String tableName = ""; - - private HashMap<InetAddress, String> reverseDNSCacheMap = - new HashMap<InetAddress, String>(); - - private String nameServer = null; - -// private Scan scan = null; - - - @SuppressWarnings("deprecation") - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (this.table == null) { - throw new IOException("No table was provided"); - } - - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); - } - - Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); - - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - - List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); - HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false); - - splits.add(split); - - return splits.toArray(new HBaseTableSplit[splits.size()]); - } - - if( keys.getSecond() == null || keys.getSecond().length == 0) { - throw new IOException("Expecting at least one region."); - } - - if( keys.getFirst().length != keys.getSecond().length ) { - throw new IOException("Regions for start and end key do not match"); - } - - byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; - byte[] maxKey = keys.getSecond()[0]; - - LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); - - byte [][] regStartKeys = keys.getFirst(); - byte [][] regStopKeys = keys.getSecond(); - String [] regions = new String[regStartKeys.length]; - - for( int i = 0; i < regStartKeys.length; i++ ) { - minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey; - maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey; - - HServerAddress regionServerAddress = - table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); - InetAddress regionAddress = - regionServerAddress.getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress + - " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - -// HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); -// InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); -// -// String regionLocation; -// -// try { -// regionLocation = reverseDNS(regionAddress); -// } catch (NamingException e) { -// LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); -// regionLocation = regionServerAddress.getHostname(); -// } - -// String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname(); - - LOG.debug( "***** " + regionLocation ); - - if( regionLocation == null || regionLocation.length() == 0 ) - throw new IOException( "The region info for regiosn " + i + " is null or empty"); - - regions[i] = regionLocation; - - LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); - } - - byte[] startRow = HConstants.EMPTY_START_ROW; - byte[] stopRow = HConstants.EMPTY_END_ROW; - - LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); - - LOG.debug("SOURCE MODE is : " + sourceMode); - - switch( sourceMode ) { - case SCAN_ALL: - startRow = HConstants.EMPTY_START_ROW; - stopRow = HConstants.EMPTY_END_ROW; - - LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - - case SCAN_RANGE: - startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; - stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; - - LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); - break; - } - - switch( sourceMode ) { - case EMPTY: - case SCAN_ALL: - case SCAN_RANGE: - { -// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow; -// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow; - - List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - if( ! useSalt ) { - - List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); - - int maxRegions = validRegions.size(); - int currentRegion = 1; - - for( HRegionLocation cRegion : validRegions ) { - byte [] rStart = cRegion.getRegionInfo().getStartKey(); - byte [] rStop = cRegion.getRegionInfo().getEndKey(); - - HServerAddress regionServerAddress = cRegion.getServerAddress(); - InetAddress regionAddress = - regionServerAddress.getInetSocketAddress().getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.error("Cannot resolve the host name for " + regionAddress + - " because of " + e); - regionLocation = regionServerAddress.getHostname(); - } - - byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); - byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); - - LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)", - (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), - (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), - rStop.length - )); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), - sStart, - sStop, - regionLocation, - SourceMode.SCAN_RANGE, useSalt - ); - - split.setEndRowInclusive( currentRegion == maxRegions ); - - currentRegion ++; - - LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", - Bytes.toString(startRow), Bytes.toString(stopRow), - Bytes.toString(rStart), Bytes.toString(rStop), - Bytes.toString(sStart), - Bytes.toString(sStop), - cRegion.getHostnamePort(), split) ); - - splits.add(split); - } - } else { - LOG.debug("Using SALT : " + useSalt ); - - // Will return the start and the stop key with all possible prefixes. - for( int i = 0; i < regions.length; i++ ) { - Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList ); - - for( Pair<byte[], byte[]> pair : intervals ) { - LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond()))); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), - pair.getFirst(), - pair.getSecond(), - regions[i], - SourceMode.SCAN_RANGE, useSalt - ); - - split.setEndRowInclusive(true); - splits.add(split); - } - } - } - - LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size()); - for( HBaseTableSplit s: splits) { - LOG.info("RETURNED SPLITS: split -> " + s); - } - - return splits.toArray(new HBaseTableSplit[splits.size()]); - } - - case GET_LIST: - { -// if( keyList == null || keyList.size() == 0 ) { - if( keyList == null ) { - throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); - } - - if( useSalt ) { - TreeSet<String> tempKeyList = new TreeSet<String>(); - - for(String key: keyList) { - tempKeyList.add(HBaseSalter.addSaltPrefix(key)); - } - - keyList = tempKeyList; - } - - LOG.debug("".format("Splitting Key List (%s)", keyList)); - - List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); - - for (int i = 0; i < keys.getFirst().length; i++) { - - if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; - } - - LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); - - Set<String> regionsSubSet = null; - - if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { - LOG.debug("REGION start is empty"); - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList; - } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { - LOG.debug("REGION start is empty"); - regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); - } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { - LOG.debug("REGION stop is empty"); - regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); - } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { - regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); - } else { - throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)", - regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); - } - - if( regionsSubSet == null || regionsSubSet.size() == 0) { - LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null"); - - continue; - } - - TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); - - LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); - - HBaseTableSplit split = new HBaseTableSplit( - table.getTableName(), regionKeyList, versions, - regions[i], - SourceMode.GET_LIST, useSalt); - splits.add(split); - } - - LOG.debug("RETURNED SPLITS: split -> " + splits); - - return splits.toArray(new HBaseTableSplit[splits.size()]); - } - - default: - throw new IOException("Unknown source Mode : " + sourceMode ); - } - } - - private String reverseDNS(InetAddress ipAddress) throws NamingException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); - this.reverseDNSCacheMap.put(ipAddress, hostName); - } - return hostName; - } - - - @Override - public RecordReader<ImmutableBytesWritable, Result> getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - if( ! (split instanceof HBaseTableSplit ) ) - throw new IOException("Table Split is not type HBaseTableSplit"); - - HBaseTableSplit tSplit = (HBaseTableSplit) split; - - HBaseRecordReader trr = new HBaseRecordReader(); - - switch( tSplit.getSourceMode() ) { - case SCAN_ALL: - case SCAN_RANGE: - { - LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); - - trr.setStartRow(tSplit.getStartRow()); - trr.setEndRow(tSplit.getEndRow()); - trr.setEndRowInclusive(tSplit.getEndRowInclusive()); - trr.setUseSalt(useSalt); - } - - break; - - case GET_LIST: - { - LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); - - trr.setKeyList(tSplit.getKeyList()); - trr.setVersions(tSplit.getVersions()); - trr.setUseSalt(useSalt); - } - - break; - - default: - throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() ); - } - - trr.setSourceMode(tSplit.getSourceMode()); - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - - trr.init(); - - return trr; - } - - - - /* Configuration Section */ - - /** - * space delimited list of columns - */ - public static final String COLUMN_LIST = "hbase.tablecolumns"; - - /** - * Use this jobconf param to specify the input table - */ - private static final String INPUT_TABLE = "hbase.inputtable"; - - private String startKey = null; - private String stopKey = null; - - private SourceMode sourceMode = SourceMode.EMPTY; - private TreeSet<String> keyList = null; - private int versions = 1; - private boolean useSalt = false; - private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - public void configure(JobConf job) { - String tableName = getTableName(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte [][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - - try { - setHTable(new HTable(HBaseConfiguration.create(job), tableName)); - } catch (Exception e) { - LOG.error( "************* Table could not be created" ); - LOG.error(StringUtils.stringifyException(e)); - } - - LOG.debug("Entered : " + this.getClass() + " : configure()" ); - - useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false); - prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST); - - sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; - - LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally", - String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); - - switch( sourceMode ) { - case SCAN_RANGE: - LOG.info("HIT SCAN_RANGE"); - - startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) ); - stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) ); - - LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) ); - break; - - case GET_LIST: - LOG.info("HIT GET_LIST"); - - Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); - keyList = new TreeSet<String> (keys); - - versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); - - LOG.debug( "GOT KEY LIST : " + keys ); - LOG.debug(String.format("SETTING key list (%s)", keyList) ); - - break; - - case EMPTY: - LOG.info("HIT EMPTY"); - - sourceMode = SourceMode.SCAN_ALL; - break; - - default: - LOG.info("HIT DEFAULT"); - - break; - } - } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - String tableName = getTableName(job); - - if (tableName == null) { - throw new IOException("expecting one table name"); - } - LOG.debug(String.format("Found Table name [%s]", tableName)); - - - // connected to table? - if (getHTable() == null) { - throw new IOException("could not connect to table '" + - tableName + "'"); - } - LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - LOG.debug(String.format("Found Columns [%s]", colArg)); - - LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey)); - - if( sourceMode == SourceMode.EMPTY ) { - throw new IOException("SourceMode should not be EMPTY"); - } - - if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) { - throw new IOException( "Source mode is GET_LIST bu key list is empty"); - } - } - - - /* Getters & Setters */ - private HTable getHTable() { return this.table; } - private void setHTable(HTable ht) { this.table = ht; } - private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; } - - - private void setJobProp( JobConf job, String key, String value) { - if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); - job.set(key, value); - } - - private String getJobProp( JobConf job, String key ) { return job.get(key); } - - public static void setTableName(JobConf job, String tableName) { - // Make sure that table has not been set before - String oldTableName = getTableName(job); - if(oldTableName != null) { - throw new RuntimeException("table name already set to: '" - + oldTableName + "'"); - } - - job.set(INPUT_TABLE, tableName); - } - - public static String getTableName(JobConf job) { - return job.get(INPUT_TABLE); - } - - protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { - return true; - } +public class HBaseInputFormat implements + InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + + private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); + + private final String id = UUID.randomUUID().toString(); + + private byte[][] inputColumns; + private HTable table; + // private HBaseRecordReader tableRecordReader; + private Filter rowFilter; + // private String tableName = ""; + + private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); + + private String nameServer = null; + + // private Scan scan = null; + + private HBaseMultiInputSplit[] convertToMultiSplitArray( + List<HBaseTableSplit> splits) throws IOException { + + if (splits == null) + throw new IOException("The list of splits is null => " + splits); + + HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>(); + + for (HBaseTableSplit hbt : splits) { + HBaseMultiInputSplit mis = null; + if (regionSplits.containsKey(hbt.getRegionLocation())) { + mis = regionSplits.get(hbt.getRegionLocation()); + } else { + regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit( + hbt.getRegionLocation())); + mis = regionSplits.get(hbt.getRegionLocation()); + } + + mis.addSplit(hbt); + regionSplits.put(hbt.getRegionLocation(), mis); + } + + Collection<HBaseMultiInputSplit> outVals = regionSplits.values(); + + LOG.debug("".format("Returning array of splits : %s", outVals)); + + if (outVals == null) + throw new IOException("The list of multi input splits were null"); + + return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]); + } + + @SuppressWarnings("deprecation") + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + if (this.table == null) { + throw new IOException("No table was provided"); + } + + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + + final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + + if (keys == null || keys.getFirst() == null + || keys.getFirst().length == 0) { + HRegionLocation regLoc = table.getRegionLocation( + HConstants.EMPTY_BYTE_ARRAY, false); + + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + + final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split( + Addressing.HOSTNAME_PORT_SEPARATOR)[0], + SourceMode.EMPTY, false); + + splits.add(split); + + // TODO: Change to HBaseMultiSplit + return convertToMultiSplitArray(splits); + } + + if (keys.getSecond() == null || keys.getSecond().length == 0) { + throw new IOException("Expecting at least one region."); + } + + if (keys.getFirst().length != keys.getSecond().length) { + throw new IOException("Regions for start and end key do not match"); + } + + byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; + byte[] maxKey = keys.getSecond()[0]; + + LOG.debug(String.format("SETTING min key (%s) and max key (%s)", + Bytes.toString(minKey), Bytes.toString(maxKey))); + + byte[][] regStartKeys = keys.getFirst(); + byte[][] regStopKeys = keys.getSecond(); + String[] regions = new String[regStartKeys.length]; + + for (int i = 0; i < regStartKeys.length; i++) { + minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0) + && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i] + : minKey; + maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) + && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i] + : maxKey; + + HServerAddress regionServerAddress = table.getRegionLocation( + keys.getFirst()[i]).getServerAddress(); + InetAddress regionAddress = regionServerAddress.getInetSocketAddress() + .getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + // HServerAddress regionServerAddress = + // table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); + // InetAddress regionAddress = + // regionServerAddress.getInetSocketAddress().getAddress(); + // + // String regionLocation; + // + // try { + // regionLocation = reverseDNS(regionAddress); + // } catch (NamingException e) { + // LOG.error("Cannot resolve the host name for " + regionAddress + + // " because of " + e); + // regionLocation = regionServerAddress.getHostname(); + // } + + // String regionLocation = + // table.getRegionLocation(keys.getFirst()[i]).getHostname(); + + LOG.debug("***** " + regionLocation); + + if (regionLocation == null || regionLocation.length() == 0) + throw new IOException("The region info for regiosn " + i + + " is null or empty"); + + regions[i] = regionLocation; + + LOG.debug(String.format( + "Region (%s) has start key (%s) and stop key (%s)", regions[i], + Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); + } + + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + + LOG.debug(String.format("Found min key (%s) and max key (%s)", + Bytes.toString(minKey), Bytes.toString(maxKey))); + + LOG.debug("SOURCE MODE is : " + sourceMode); + + switch (sourceMode) { + case SCAN_ALL: + startRow = HConstants.EMPTY_START_ROW; + stopRow = HConstants.EMPTY_END_ROW; + + LOG.info(String.format( + "SCAN ALL: Found start key (%s) and stop key (%s)", + Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + + case SCAN_RANGE: + startRow = (startKey != null && startKey.length() != 0) ? Bytes + .toBytes(startKey) : HConstants.EMPTY_START_ROW; + stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes + .toBytes(stopKey) : HConstants.EMPTY_END_ROW; + + LOG.info(String.format( + "SCAN RANGE: Found start key (%s) and stop key (%s)", + Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + } + + switch (sourceMode) { + case EMPTY: + case SCAN_ALL: + case SCAN_RANGE: { + // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : + // startRow; + // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : + // stopRow; + + final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + if (!useSalt) { + + List<HRegionLocation> validRegions = table.getRegionsInRange( + startRow, stopRow); + + int maxRegions = validRegions.size(); + int currentRegion = 1; + + for (HRegionLocation cRegion : validRegions) { + byte[] rStart = cRegion.getRegionInfo().getStartKey(); + byte[] rStop = cRegion.getRegionInfo().getEndKey(); + + HServerAddress regionServerAddress = cRegion + .getServerAddress(); + InetAddress regionAddress = regionServerAddress + .getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + + regionAddress + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + byte[] sStart = (startRow == HConstants.EMPTY_START_ROW + || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart + : startRow); + byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW + || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop + : stopRow); + + LOG.debug(String.format( + "BOOL start (%s) stop (%s) length (%d)", + (startRow == HConstants.EMPTY_START_ROW || (Bytes + .compareTo(startRow, rStart) <= 0)), + (stopRow == HConstants.EMPTY_END_ROW || (Bytes + .compareTo(stopRow, rStop) >= 0)), rStop.length)); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), sStart, sStop, regionLocation, + SourceMode.SCAN_RANGE, useSalt); + + split.setEndRowInclusive(currentRegion == maxRegions); + + currentRegion++; + + LOG.debug(String + .format( + "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", + Bytes.toString(startRow), + Bytes.toString(stopRow), Bytes.toString(rStart), + Bytes.toString(rStop), Bytes.toString(sStart), + Bytes.toString(sStop), cRegion.getHostnamePort(), + split)); + + splits.add(split); + } + } else { + LOG.debug("Using SALT : " + useSalt); + + // Will return the start and the stop key with all possible + // prefixes. + for (int i = 0; i < regions.length; i++) { + Pair<byte[], byte[]>[] intervals = HBaseSalter + .getDistributedIntervals(startRow, stopRow, + regStartKeys[i], regStopKeys[i], prefixList); + + for (Pair<byte[], byte[]> pair : intervals) { + LOG.debug("".format( + "Using SALT, Region (%s) Start (%s) Stop (%s)", + regions[i], Bytes.toString(pair.getFirst()), + Bytes.toString(pair.getSecond()))); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), pair.getFirst(), + pair.getSecond(), regions[i], SourceMode.SCAN_RANGE, + useSalt); + + split.setEndRowInclusive(true); + splits.add(split); + } + } + } + + LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size()); + + // TODO: Change to HBaseMultiSplit + return convertToMultiSplitArray(splits); + } + + case GET_LIST: { + // if( keyList == null || keyList.size() == 0 ) { + if (keyList == null) { + throw new IOException( + "Source Mode is GET_LIST but key list is EMPTY"); + } + + if (useSalt) { + TreeSet<String> tempKeyList = new TreeSet<String>(); + + for (String key : keyList) { + tempKeyList.add(HBaseSalter.addSaltPrefix(key)); + } + + keyList = tempKeyList; + } + + LOG.info("".format("Splitting Key List (%s)", keyList)); + + final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + for (int i = 0; i < keys.getFirst().length; i++) { + + if (!includeRegionInSplit(keys.getFirst()[i], + keys.getSecond()[i])) { + continue; + } + + LOG.debug(String.format( + "Getting region (%s) subset (%s) to (%s)", regions[i], + Bytes.toString(regStartKeys[i]), + Bytes.toString(regStopKeys[i]))); + + Set<String> regionsSubSet = null; + + if ((regStartKeys[i] == null || regStartKeys[i].length == 0) + && (regStopKeys[i] == null || regStopKeys[i].length == 0)) { + LOG.debug("REGION start is empty"); + LOG.debug("REGION stop is empty"); + regionsSubSet = keyList; + } else if (regStartKeys[i] == null + || regStartKeys[i].length == 0) { + LOG.debug("REGION start is empty"); + regionsSubSet = keyList.headSet( + Bytes.toString(regStopKeys[i]), true); + } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) { + LOG.debug("REGION stop is empty"); + regionsSubSet = keyList.tailSet( + Bytes.toString(regStartKeys[i]), true); + } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) { + regionsSubSet = keyList.subSet( + Bytes.toString(regStartKeys[i]), true, + Bytes.toString(regStopKeys[i]), true); + } else { + throw new IOException(String.format( + "For REGION (%s) Start Key (%s) > Stop Key(%s)", + regions[i], Bytes.toString(regStartKeys[i]), + Bytes.toString(regStopKeys[i]))); + } + + if (regionsSubSet == null || regionsSubSet.size() == 0) { + LOG.debug("EMPTY: Key is for region " + regions[i] + + " is null"); + + continue; + } + + TreeSet<String> regionKeyList = new TreeSet<String>( + regionsSubSet); + + LOG.debug(String.format("Regions [%s] has key list <%s>", + regions[i], regionKeyList)); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), regionKeyList, versions, regions[i], + SourceMode.GET_LIST, useSalt); + splits.add(split); + } + + // if (splits.isEmpty()) { + // LOG.info("GOT EMPTY SPLITS"); + + // throw new IOException( + // "".format("Key List NOT found in any region")); + + // HRegionLocation regLoc = table.getRegionLocation( + // HConstants.EMPTY_BYTE_ARRAY, false); + // + // if (null == regLoc) { + // throw new IOException("Expecting at least one region."); + // } + // + // HBaseTableSplit split = new HBaseTableSplit( + // table.getTableName(), HConstants.EMPTY_BYTE_ARRAY, + // HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort() + // .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], + // SourceMode.EMPTY, false); + // + // splits.add(split); + // } + + LOG.info("RETURNED SPLITS: split -> " + splits); + + // TODO: Change to HBaseMultiSplit + return convertToMultiSplitArray(splits); + } + + default: + throw new IOException("Unknown source Mode : " + sourceMode); + } + } + + private String reverseDNS(InetAddress ipAddress) throws NamingException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + hostName = Strings.domainNamePointerToHostName(DNS.reverseDns( + ipAddress, this.nameServer)); + this.reverseDNSCacheMap.put(ipAddress, hostName); + } + return hostName; + } + + @Override + public RecordReader<ImmutableBytesWritable, Result> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + + if (!(split instanceof HBaseMultiInputSplit)) + throw new IOException("Table Split is not type HBaseMultiInputSplit"); + + HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split; + + HBaseRecordReader trr = new HBaseRecordReader(tSplit); + + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.setUseSalt(useSalt); + + trr.setNextSplit(); + + return trr; + } + + /* Configuration Section */ + + /** + * space delimited list of columns + */ + public static final String COLUMN_LIST = "hbase.tablecolumns"; + + /** + * Use this jobconf param to specify the input table + */ + private static final String INPUT_TABLE = "hbase.inputtable"; + + private String startKey = null; + private String stopKey = null; + + private SourceMode sourceMode = SourceMode.EMPTY; + private TreeSet<String> keyList = null; + private int versions = 1; + private boolean useSalt = false; + private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; + + public void configure(JobConf job) { + String tableName = getTableName(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte[][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColumns(m_cols); + + try { + setHTable(new HTable(HBaseConfiguration.create(job), tableName)); + } catch (Exception e) { + LOG.error("************* Table could not be created"); + LOG.error(StringUtils.stringifyException(e)); + } + + LOG.debug("Entered : " + this.getClass() + " : configure()"); + + useSalt = job.getBoolean( + String.format(HBaseConstants.USE_SALT, getTableName(job)), false); + prefixList = job.get( + String.format(HBaseConstants.SALT_PREFIX, getTableName(job)), + HBaseSalter.DEFAULT_PREFIX_LIST); + + sourceMode = SourceMode.valueOf(job.get(String.format( + HBaseConstants.SOURCE_MODE, getTableName(job)))); + + LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String + .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job + .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), + sourceMode)); + + switch (sourceMode) { + case SCAN_RANGE: + LOG.info("HIT SCAN_RANGE"); + + startKey = getJobProp(job, + String.format(HBaseConstants.START_KEY, getTableName(job))); + stopKey = getJobProp(job, + String.format(HBaseConstants.STOP_KEY, getTableName(job))); + + LOG.info(String.format("Setting start key (%s) and stop key (%s)", + startKey, stopKey)); + break; + + case GET_LIST: + LOG.info("HIT GET_LIST"); + + Collection<String> keys = job.getStringCollection(String.format( + HBaseConstants.KEY_LIST, getTableName(job))); + keyList = new TreeSet<String>(keys); + + versions = job.getInt( + String.format(HBaseConstants.VERSIONS, getTableName(job)), 1); + + LOG.debug("GOT KEY LIST : " + keys); + LOG.debug(String.format("SETTING key list (%s)", keyList)); + + break; + + case EMPTY: + LOG.info("HIT EMPTY"); + + sourceMode = SourceMode.SCAN_ALL; + break; + + default: + LOG.info("HIT DEFAULT"); + + break; + } + } + + public void validateInput(JobConf job) throws IOException { + // expecting exactly one path + String tableName = getTableName(job); + + if (tableName == null) { + throw new IOException("expecting one table name"); + } + LOG.debug(String.format("Found Table name [%s]", tableName)); + + // connected to table? + if (getHTable() == null) { + throw new IOException("could not connect to table '" + tableName + "'"); + } + LOG.debug(String.format("Found Table [%s]", getHTable().getTableName())); + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + LOG.debug(String.format("Found Columns [%s]", colArg)); + + LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, + stopKey)); + + if (sourceMode == SourceMode.EMPTY) { + throw new IOException("SourceMode should not be EMPTY"); + } + + if (sourceMode == SourceMode.GET_LIST + && (keyList == null || keyList.size() == 0)) { + throw new IOException("Source mode is GET_LIST bu key list is empty"); + } + } + + /* Getters & Setters */ + private HTable getHTable() { + return this.table; + } + + private void setHTable(HTable ht) { + this.table = ht; + } + + private void setInputColumns(byte[][] ic) { + this.inputColumns = ic; + } + + private void setJobProp(JobConf job, String key, String value) { + if (job.get(key) != null) + throw new RuntimeException(String.format( + "Job Conf already has key [%s] with value [%s]", key, + job.get(key))); + job.set(key, value); + } + + private String getJobProp(JobConf job, String key) { + return job.get(key); + } + + public static void setTableName(JobConf job, String tableName) { + // Make sure that table has not been set before + String oldTableName = getTableName(job); + if (oldTableName != null) { + throw new RuntimeException("table name already set to: '" + + oldTableName + "'"); + } + + job.set(INPUT_TABLE, tableName); + } + + public static String getTableName(JobConf job) { + return job.get(INPUT_TABLE); + } + + protected boolean includeRegionInSplit(final byte[] startKey, + final byte[] endKey) { + return true; + } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java new file mode 100644 index 0000000..02e7f7b --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java @@ -0,0 +1,111 @@ +package parallelai.spyglass.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +public class HBaseMultiInputSplit implements InputSplit, + Comparable<HBaseMultiInputSplit>, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class); + + private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + private String regionLocation = null; + + /** default constructor */ + private HBaseMultiInputSplit() { + + } + + public HBaseMultiInputSplit(String regionLocation) { + this.regionLocation = regionLocation; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.debug("REGION GETTER : " + regionLocation); + + return this.regionLocation; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("READ ME : " + in.toString()); + + int s = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < s; i++) { + HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils + .deserialize(Bytes.readByteArray(in)); + splits.add(hbts); + } + + LOG.debug("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("WRITE : " + this); + + Bytes.writeByteArray(out, Bytes.toBytes(splits.size())); + + for (HBaseTableSplit hbts : splits) { + Bytes.writeByteArray(out, SerializationUtils.serialize(hbts)); + } + + LOG.debug("WROTE : " + out.toString()); + } + + @Override + public String toString() { + StringBuffer str = new StringBuffer(); + str.append("HBaseMultiSplit : "); + + for (HBaseTableSplit hbt : splits) { + str.append(" [" + hbt.toString() + "]"); + } + + return str.toString(); + } + + @Override + public int compareTo(HBaseMultiInputSplit o) { + // TODO: Make this comparison better + return (splits.size() - o.splits.size()); + } + + @Override + public long getLength() throws IOException { + return splits.size(); + } + + @Override + public String[] getLocations() throws IOException { + LOG.debug("REGION ARRAY : " + regionLocation); + + return new String[] { this.regionLocation }; + } + + public void addSplit(HBaseTableSplit hbt) throws IOException { + if (hbt.getRegionLocation().equals(regionLocation)) + splits.add(hbt); + else + throw new IOException("HBaseTableSplit Region Location " + + hbt.getRegionLocation() + + " does NOT match MultiSplit Region Location " + regionLocation); + } + + public List<HBaseTableSplit> getSplits() { + return splits; + } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java index d22ed71..5d7dbdd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; @@ -31,475 +31,579 @@ import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReader implements - RecordReader<ImmutableBytesWritable, Result> { - - static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); - - private byte[] startRow; - private byte[] endRow; - private byte[] lastSuccessfulRow; - private TreeSet<String> keyList; - private SourceMode sourceMode; - private Filter trrRowFilter; - private ResultScanner scanner; - private HTable htable; - private byte[][] trrInputColumns; - private long timestamp; - private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - private boolean endRowInclusive = true; - private int versions = 1; - private boolean useSalt = false; - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - public void restartRangeScan(byte[] firstRow) throws IOException { - Scan currentScan; - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) - + ", endRow: " + Bytes.toString(endRow)); - Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, - new byte[] { 0 }) : endRow)); - TableInputFormat.addColumns(scan, trrInputColumns); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - } else { - LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) - + ", no endRow"); - - Scan scan = new Scan(firstRow); - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - if (logScannerActivity) { - LOG.debug("Current scan=" + currentScan.toString()); - timestamp = System.currentTimeMillis(); - rowcount = 0; - } - } - - public TreeSet<String> getKeyList() { - return keyList; - } - - public void setKeyList(TreeSet<String> keyList) { - this.keyList = keyList; - } - - public void setVersions(int versions) { - this.versions = versions; - } - - public void setUseSalt(boolean useSalt) { - this.useSalt = useSalt; - } - - public SourceMode getSourceMode() { - return sourceMode; - } - - public void setSourceMode(SourceMode sourceMode) { - this.sourceMode = sourceMode; - } - - public byte[] getEndRow() { - return endRow; - } - - public void setEndRowInclusive(boolean isInclusive) { - endRowInclusive = isInclusive; - } - - public boolean getEndRowInclusive() { - return endRowInclusive; - } - - private byte[] nextKey = null; - private Vector<List<KeyValue>> resultVector = null; - Map<Long, List<KeyValue>> keyValueMap = null; - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - public void init() throws IOException { - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - restartRangeScan(startRow); - break; - - case GET_LIST: - nextKey = Bytes.toBytes(keyList.pollFirst()); - break; - - default: - throw new IOException(" Unknown source mode : " + sourceMode); - } - } - - byte[] getStartRow() { - return this.startRow; - } - - /** - * @param htable - * the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, - false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * @param inputColumns - * the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte[][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow - * the first row in the split - */ - public void setStartRow(final byte[] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow - * the last row in the split - */ - public void setEndRow(final byte[] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter - * the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - - @Override - public void close() { - if (this.scanner != null) - this.scanner.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - @Override - public Result createValue() { - return new Result(); - } - - @Override - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - @Override - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key - * HStoreKey as input key. - * @param value - * MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - @Override - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - - switch (sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: { - - Result result; - try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // try to handle all IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," - + " if your mapper has restarted a few other times like this" - + " then you should consider killing this job and investigate" - + " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restartRangeScan(startRow); - } else { - restartRangeScan(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row - } - result = this.scanner.next(); - } - - if (result != null && result.size() > 0) { - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return false; - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.debug("Mapper took " + (now - timestamp) + "ms to process " - + rowcount + " rows"); - LOG.debug(ioe); - String lastRow = lastSuccessfulRow == null ? "null" : Bytes - .toStringBinary(lastSuccessfulRow); - LOG.debug("lastSuccessfulRow=" + lastRow); - } - throw ioe; - } - } - - case GET_LIST: { - LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey))); - - if (versions == 1) { - if (nextKey != null) { - LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result result = this.htable.get(theGet); - - if (result != null && (! result.isEmpty()) ) { - LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) ); - - if (keyList != null || !keyList.isEmpty()) { - String newKey = keyList.pollFirst(); - LOG.debug("New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - } else { - nextKey = null; - } - - LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey))); - - // Write the result - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0 - - String newKey; - while((newKey = keyList.pollFirst()) != null) { - LOG.debug("WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - if( nextKey == null ) { - LOG.error("BOMB! BOMB! BOMB!"); - continue; - } - - if( ! this.htable.exists( new Get(nextKey) ) ) { - LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); - continue; - } else { break; } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - LOG.debug("Final New Key => " + Bytes.toString(nextKey)); - - return next(key, value); - } - } else { - // Nothig left. return false - return false; - } - } else { - if (resultVector != null && resultVector.size() != 0) { - LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) ); - - List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); - Result result = new Result(resultKeyValue); - - LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) ); - - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - - return true; - } else { - if (nextKey != null) { - LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey))); - - Get theGet = new Get(nextKey); - theGet.setMaxVersions(versions); - - Result resultAll = this.htable.get(theGet); - - if( resultAll != null && (! resultAll.isEmpty())) { - List<KeyValue> keyValeList = resultAll.list(); - - keyValueMap = new HashMap<Long, List<KeyValue>>(); - - LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap)); - - for (KeyValue keyValue : keyValeList) { - long version = keyValue.getTimestamp(); - - if (keyValueMap.containsKey(new Long(version))) { - List<KeyValue> keyValueTempList = keyValueMap.get(new Long( - version)); - if (keyValueTempList == null) { - keyValueTempList = new ArrayList<KeyValue>(); - } - keyValueTempList.add(keyValue); - } else { - List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); - keyValueMap.put(new Long(version), keyValueTempList); - keyValueTempList.add(keyValue); - } - } - - resultVector = new Vector<List<KeyValue>>(); - resultVector.addAll(keyValueMap.values()); - - List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1); - - Result result = new Result(resultKeyValue); - - LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) ); - - String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// - - System.out.println("+ New Key => " + newKey); - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - if( useSalt) { - key.set( HBaseSalter.delSaltPrefix(result.getRow())); - } else { - key.set(result.getRow()); - } - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; - } else { - LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0 - - String newKey; - - while( (newKey = keyList.pollFirst()) != null ) { - LOG.debug("+ WHILE NEXT Key => " + newKey); - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - if( nextKey == null ) { - LOG.error("+ BOMB! BOMB! BOMB!"); - continue; - } - - if( ! this.htable.exists( new Get(nextKey) ) ) { - LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) )); - continue; - } else { break; } - } - - nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes - .toBytes(newKey); - - LOG.debug("+ Final New Key => " + Bytes.toString(nextKey)); - - return next(key, value); - } - - } else { - return false; - } - } - } - } - default: - throw new IOException("Unknown source mode : " + sourceMode); - } - } + RecordReader<ImmutableBytesWritable, Result> { + + static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); + + private byte[] startRow; + private byte[] endRow; + private byte[] lastSuccessfulRow; + private TreeSet<String> keyList; + private SourceMode sourceMode; + private Filter trrRowFilter; + private ResultScanner scanner; + private HTable htable; + private byte[][] trrInputColumns; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; + private boolean endRowInclusive = true; + private int versions = 1; + private boolean useSalt = false; + + private HBaseMultiInputSplit multiSplit = null; + private List<HBaseTableSplit> allSplits = null; + + private HBaseRecordReader() { + } + + public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException { + multiSplit = mSplit; + + LOG.info("Creatin Multi Split for region location : " + + multiSplit.getRegionLocation()); + + allSplits = multiSplit.getSplits(); + } + + public boolean setNextSplit() throws IOException { + if (allSplits.size() > 0) { + setSplitValue(allSplits.remove(0)); + return true; + } else { + return false; + } + } + + private void setSplitValue(HBaseTableSplit tSplit) throws IOException { + switch (tSplit.getSourceMode()) { + case SCAN_ALL: + case SCAN_RANGE: { + LOG.debug(String.format( + "For split [%s] we have start key (%s) and stop key (%s)", + tSplit, tSplit.getStartRow(), tSplit.getEndRow())); + + setStartRow(tSplit.getStartRow()); + setEndRow(tSplit.getEndRow()); + setEndRowInclusive(tSplit.getEndRowInclusive()); + } + + break; + + case GET_LIST: { + LOG.debug(String.format("For split [%s] we have key list (%s)", + tSplit, tSplit.getKeyList())); + + setKeyList(tSplit.getKeyList()); + setVersions(tSplit.getVersions()); + } + + break; + + case EMPTY: + LOG.info("EMPTY split. Doing nothing."); + break; + + default: + throw new IOException("Unknown source mode : " + + tSplit.getSourceMode()); + } + + setSourceMode(tSplit.getSourceMode()); + + init(); + } + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + private void restartRangeScan(byte[] firstRow) throws IOException { + Scan currentScan; + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + scan.setCacheBlocks(false); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } else { + LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow) + + ", endRow: " + Bytes.toString(endRow)); + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, + new byte[] { 0 }) : endRow)); + TableInputFormat.addColumns(scan, trrInputColumns); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + } else { + LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + + ", no endRow"); + + Scan scan = new Scan(firstRow); + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + if (logScannerActivity) { + LOG.debug("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } + } + + public TreeSet<String> getKeyList() { + return keyList; + } + + private void setKeyList(TreeSet<String> keyList) { + this.keyList = keyList; + } + + private void setVersions(int versions) { + this.versions = versions; + } + + public void setUseSalt(boolean useSalt) { + this.useSalt = useSalt; + } + + public SourceMode getSourceMode() { + return sourceMode; + } + + private void setSourceMode(SourceMode sourceMode) { + this.sourceMode = sourceMode; + } + + public byte[] getEndRow() { + return endRow; + } + + private void setEndRowInclusive(boolean isInclusive) { + endRowInclusive = isInclusive; + } + + public boolean getEndRowInclusive() { + return endRowInclusive; + } + + private byte[] nextKey = null; + private Vector<List<KeyValue>> resultVector = null; + Map<Long, List<KeyValue>> keyValueMap = null; + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + private void init() throws IOException { + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + restartRangeScan(startRow); + break; + + case GET_LIST: + nextKey = Bytes.toBytes(keyList.pollFirst()); + break; + + case EMPTY: + LOG.info("EMPTY mode. Do nothing"); + break; + + default: + throw new IOException(" Unknown source mode : " + sourceMode); + } + } + + byte[] getStartRow() { + return this.startRow; + } + + /** + * @param htable + * the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean( + ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * @param inputColumns + * the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte[][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow + * the first row in the split + */ + private void setStartRow(final byte[] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow + * the last row in the split + */ + private void setEndRow(final byte[] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter + * the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + @Override + public void close() { + if (this.scanner != null) + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + @Override + public Result createValue() { + return new Result(); + } + + @Override + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + @Override + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key + * HStoreKey as input key. + * @param value + * MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + @Override + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + + switch (sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: { + + Result result; + try { + try { + result = this.scanner.next(); + if (logScannerActivity) { + rowcount++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + + "ms to process " + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restartRangeScan(startRow); + } else { + restartRangeScan(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + if (useSalt) { + key.set(HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return setNextSplit(); + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.debug("Mapper took " + (now - timestamp) + + "ms to process " + rowcount + " rows"); + LOG.debug(ioe); + String lastRow = lastSuccessfulRow == null ? "null" : Bytes + .toStringBinary(lastSuccessfulRow); + LOG.debug("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } + } + + case GET_LIST: { + LOG.debug(String.format("INTO next with GET LIST and Key (%s)", + Bytes.toString(nextKey))); + + if (versions == 1) { + if (nextKey != null) { + LOG.debug(String.format("Processing Key (%s)", + Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result result = this.htable.get(theGet); + + if (result != null && (!result.isEmpty())) { + LOG.debug(String.format( + "Key (%s), Version (%s), Got Result (%s)", + Bytes.toString(nextKey), versions, result)); + + if (keyList != null || !keyList.isEmpty()) { + String newKey = keyList.pollFirst(); + LOG.debug("New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null + : Bytes.toBytes(newKey); + } else { + nextKey = null; + } + + LOG.debug(String.format("=> Picked a new Key (%s)", + Bytes.toString(nextKey))); + + // Write the result + if (useSalt) { + key.set(HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + LOG.debug(" Key (" + Bytes.toString(nextKey) + + ") return an EMPTY result. Get (" + theGet.getId() + + ")"); // alg0 + + String newKey; + while ((newKey = keyList.pollFirst()) != null) { + LOG.debug("WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null + : Bytes.toBytes(newKey); + + if (nextKey == null) { + LOG.error("BOMB! BOMB! BOMB!"); + continue; + } + + if (!this.htable.exists(new Get(nextKey))) { + LOG.debug(String.format( + "Key (%s) Does not exist in Table (%s)", + Bytes.toString(nextKey), + Bytes.toString(this.htable.getTableName()))); + continue; + } else { + break; + } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null + : Bytes.toBytes(newKey); + + LOG.debug("Final New Key => " + Bytes.toString(nextKey)); + + return next(key, value); + } + } else { + // Nothig left. return false + return setNextSplit(); + } + } else { + if (resultVector != null && resultVector.size() != 0) { + LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", + versions, resultVector)); + + List<KeyValue> resultKeyValue = resultVector + .remove(resultVector.size() - 1); + Result result = new Result(resultKeyValue); + + LOG.debug(String.format("+ Version (%s), Got Result <%s>", + versions, result)); + + if (useSalt) { + key.set(HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + + return true; + } else { + if (nextKey != null) { + LOG.debug(String.format("+ Processing Key (%s)", + Bytes.toString(nextKey))); + + Get theGet = new Get(nextKey); + theGet.setMaxVersions(versions); + + Result resultAll = this.htable.get(theGet); + + if (resultAll != null && (!resultAll.isEmpty())) { + List<KeyValue> keyValeList = resultAll.list(); + + keyValueMap = new HashMap<Long, List<KeyValue>>(); + + LOG.debug(String.format( + "+ Key (%s) Versions (%s) Val;ute map <%s>", + Bytes.toString(nextKey), versions, keyValueMap)); + + for (KeyValue keyValue : keyValeList) { + long version = keyValue.getTimestamp(); + + if (keyValueMap.containsKey(new Long(version))) { + List<KeyValue> keyValueTempList = keyValueMap + .get(new Long(version)); + if (keyValueTempList == null) { + keyValueTempList = new ArrayList<KeyValue>(); + } + keyValueTempList.add(keyValue); + } else { + List<KeyValue> keyValueTempList = new ArrayList<KeyValue>(); + keyValueMap.put(new Long(version), + keyValueTempList); + keyValueTempList.add(keyValue); + } + } + + resultVector = new Vector<List<KeyValue>>(); + resultVector.addAll(keyValueMap.values()); + + List<KeyValue> resultKeyValue = resultVector + .remove(resultVector.size() - 1); + + Result result = new Result(resultKeyValue); + + LOG.debug(String.format( + "+ Version (%s), Got Result (%s)", versions, + result)); + + String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());// + + System.out.println("+ New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null + : Bytes.toBytes(newKey); + + if (useSalt) { + key.set(HBaseSalter.delSaltPrefix(result.getRow())); + } else { + key.set(result.getRow()); + } + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } else { + LOG.debug(String.format( + "+ Key (%s) return an EMPTY result. Get (%s)", + Bytes.toString(nextKey), theGet.getId())); // alg0 + + String newKey; + + while ((newKey = keyList.pollFirst()) != null) { + LOG.debug("+ WHILE NEXT Key => " + newKey); + + nextKey = (newKey == null || newKey.length() == 0) ? null + : Bytes.toBytes(newKey); + + if (nextKey == null) { + LOG.error("+ BOMB! BOMB! BOMB!"); + continue; + } + + if (!this.htable.exists(new Get(nextKey))) { + LOG.debug(String.format( + "+ Key (%s) Does not exist in Table (%s)", + Bytes.toString(nextKey), + Bytes.toString(this.htable.getTableName()))); + continue; + } else { + break; + } + } + + nextKey = (newKey == null || newKey.length() == 0) ? null + : Bytes.toBytes(newKey); + + LOG.debug("+ Final New Key => " + + Bytes.toString(nextKey)); + + return next(key, value); + } + + } else { + return setNextSplit(); + } + } + } + } + + case EMPTY: { + LOG.info("GOT an empty Split"); + return setNextSplit(); + } + + default: + throw new IOException("Unknown source mode : " + sourceMode); + } + } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index 3c64e52..aa446c1 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -219,10 +219,7 @@ public class HBaseScheme String fieldName = (String) fields.get(k); byte[] fieldNameBytes = Bytes.toBytes(fieldName); byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes); - if (cellValue == null) { - cellValue = new byte[0]; - } - result.add(new ImmutableBytesWritable(cellValue)); + result.add(cellValue != null ? new ImmutableBytesWritable(cellValue) : null); } } @@ -259,7 +256,8 @@ public class HBaseScheme Tuple tuple = values.getTuple(); ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j); - put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get()); + if (valueBytes != null) + put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get()); } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java index a5c3bdd..87b8f58 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -12,196 +12,208 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.InputSplit; -import com.sun.tools.javac.resources.version; - import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { - - private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); - - private byte [] m_tableName = null; - private byte [] m_startRow = null; - private byte [] m_endRow = null; - private String m_regionLocation = null; - private TreeSet<String> m_keyList = null; - private SourceMode m_sourceMode = SourceMode.EMPTY; - private boolean m_endRowInclusive = true; - private int m_versions = 1; - private boolean m_useSalt = false; - - /** default constructor */ - public HBaseTableSplit() { - this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); - } - - /** - * Constructor - * @param tableName - * @param startRow - * @param endRow - * @param location - */ - public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, - final String location, final SourceMode sourceMode, final boolean useSalt) { - this.m_tableName = tableName; - this.m_startRow = startRow; - this.m_endRow = endRow; - this.m_regionLocation = location; - this.m_sourceMode = sourceMode; - this.m_useSalt = useSalt; - } - - public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) { - this.m_tableName = tableName; - this.m_keyList = keyList; - this.m_versions = versions; - this.m_sourceMode = sourceMode; - this.m_regionLocation = location; - this.m_useSalt = useSalt; - } - - /** @return table name */ - public byte [] getTableName() { - return this.m_tableName; - } - - /** @return starting row key */ - public byte [] getStartRow() { - return this.m_startRow; - } - - /** @return end row key */ - public byte [] getEndRow() { - return this.m_endRow; - } - - public boolean getEndRowInclusive() { - return m_endRowInclusive; - } - - public void setEndRowInclusive(boolean isInclusive) { - m_endRowInclusive = isInclusive; - } - - /** @return list of keys to get */ - public TreeSet<String> getKeyList() { - return m_keyList; - } - - public int getVersions() { - return m_versions; - } - - /** @return get the source mode */ - public SourceMode getSourceMode() { - return m_sourceMode; - } - - public boolean getUseSalt() { - return m_useSalt; - } - - /** @return the region's hostname */ - public String getRegionLocation() { - LOG.debug("REGION GETTER : " + m_regionLocation); - - return this.m_regionLocation; - } - - public String[] getLocations() { - LOG.debug("REGION ARRAY : " + m_regionLocation); - - return new String[] {this.m_regionLocation}; - } - - @Override - public long getLength() { - // Not clear how to obtain this... seems to be used only for sorting splits - return 0; - } - - @Override - public void readFields(DataInput in) throws IOException { - LOG.debug("READ ME : " + in.toString()); - - this.m_tableName = Bytes.readByteArray(in); - this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); - this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); - this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); - - switch(this.m_sourceMode) { - case SCAN_RANGE: - this.m_startRow = Bytes.readByteArray(in); - this.m_endRow = Bytes.readByteArray(in); - this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); - break; - - case GET_LIST: - this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); - this.m_keyList = new TreeSet<String>(); - - int m = Bytes.toInt(Bytes.readByteArray(in)); - - for( int i = 0; i < m; i++) { - this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); - } - break; - } - - LOG.debug("READ and CREATED : " + this); - } - - @Override - public void write(DataOutput out) throws IOException { - LOG.debug("WRITE : " + this); - - Bytes.writeByteArray(out, this.m_tableName); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); - - switch( this.m_sourceMode ) { - case SCAN_RANGE: - Bytes.writeByteArray(out, this.m_startRow); - Bytes.writeByteArray(out, this.m_endRow); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); - break; - - case GET_LIST: - Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); - - for( String k: this.m_keyList ) { - Bytes.writeByteArray(out, Bytes.toBytes(k)); - } - break; - } - - LOG.debug("WROTE : " + out.toString()); - } - - @Override - public String toString() { - return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", - Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), - (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt); - } - - @Override - public int compareTo(HBaseTableSplit o) { - switch(m_sourceMode) { - case SCAN_ALL: - case SCAN_RANGE: - return Bytes.compareTo(getStartRow(), o.getStartRow()); - - case GET_LIST: - return m_keyList.equals( o.getKeyList() ) ? 0 : -1; - - default: - return -1; - } - - } +public class HBaseTableSplit implements InputSplit, + Comparable<HBaseTableSplit>, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); + + private byte[] m_tableName = null; + private byte[] m_startRow = null; + private byte[] m_endRow = null; + private String m_regionLocation = null; + private TreeSet<String> m_keyList = null; + private SourceMode m_sourceMode = SourceMode.EMPTY; + private boolean m_endRowInclusive = true; + private int m_versions = 1; + private boolean m_useSalt = false; + + /** default constructor */ + public HBaseTableSplit() { + this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false); + } + + /** + * Constructor + * + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public HBaseTableSplit(final byte[] tableName, final byte[] startRow, + final byte[] endRow, final String location, + final SourceMode sourceMode, final boolean useSalt) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + this.m_sourceMode = sourceMode; + this.m_useSalt = useSalt; + } + + public HBaseTableSplit(final byte[] tableName, + final TreeSet<String> keyList, int versions, final String location, + final SourceMode sourceMode, final boolean useSalt) { + this.m_tableName = tableName; + this.m_keyList = keyList; + this.m_versions = versions; + this.m_sourceMode = sourceMode; + this.m_regionLocation = location; + this.m_useSalt = useSalt; + } + + /** @return table name */ + public byte[] getTableName() { + return this.m_tableName; + } + + /** @return starting row key */ + public byte[] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte[] getEndRow() { + return this.m_endRow; + } + + public boolean getEndRowInclusive() { + return m_endRowInclusive; + } + + public void setEndRowInclusive(boolean isInclusive) { + m_endRowInclusive = isInclusive; + } + + /** @return list of keys to get */ + public TreeSet<String> getKeyList() { + return m_keyList; + } + + public int getVersions() { + return m_versions; + } + + /** @return get the source mode */ + public SourceMode getSourceMode() { + return m_sourceMode; + } + + public boolean getUseSalt() { + return m_useSalt; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.debug("REGION GETTER : " + m_regionLocation); + + return this.m_regionLocation; + } + + public String[] getLocations() { + LOG.debug("REGION ARRAY : " + m_regionLocation); + + return new String[] { this.m_regionLocation }; + } + + @Override + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting + // splits + return 0; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.debug("READ ME : " + in.toString()); + + this.m_tableName = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes + .readByteArray(in))); + this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in)); + + switch (this.m_sourceMode) { + case SCAN_RANGE: + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); + break; + + case GET_LIST: + this.m_versions = Bytes.toInt(Bytes.readByteArray(in)); + this.m_keyList = new TreeSet<String>(); + + int m = Bytes.toInt(Bytes.readByteArray(in)); + + for (int i = 0; i < m; i++) { + this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); + } + break; + } + + LOG.debug("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.debug("WRITE : " + this); + + Bytes.writeByteArray(out, this.m_tableName); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt)); + + switch (this.m_sourceMode) { + case SCAN_RANGE: + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); + break; + + case GET_LIST: + Bytes.writeByteArray(out, Bytes.toBytes(m_versions)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + + for (String k : this.m_keyList) { + Bytes.writeByteArray(out, Bytes.toBytes(k)); + } + break; + } + + LOG.debug("WROTE : " + out.toString()); + } + + @Override + public String toString() { + return String + .format( + "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, + Bytes.toString(m_startRow), Bytes.toString(m_endRow), + (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, + m_useSalt); + } + + @Override + public int compareTo(HBaseTableSplit o) { + switch (m_sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + return Bytes.compareTo(getStartRow(), o.getStartRow()); + + case GET_LIST: + return m_keyList.equals(o.getKeyList()) ? 0 : -1; + + case EMPTY: + return 0; + + default: + return -1; + } + + } }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index b6d5742..31ed3ea 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -10,15 +10,14 @@ import org.apache.hadoop.hbase.util.Bytes import cascading.tuple.TupleEntry class HBasePipeWrapper (pipe: Pipe) { - def toBytesWritable(f: Fields): Pipe = { + def toBytesWritable(f: Fields): Pipe = { asList(f) - .foldLeft(pipe){ (p, f) => { - p.map(f.toString -> f.toString){ from: String => { - new ImmutableBytesWritable(Bytes.toBytes( - if (from == null) "" else from)) - }} - }} - } + .foldLeft(pipe){ (p, f) => { + p.map(f.toString -> f.toString){ from: String => + Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) + }} + } + } // def toBytesWritable : Pipe = { // asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { @@ -30,13 +29,12 @@ class HBasePipeWrapper (pipe: Pipe) { def fromBytesWritable(f: Fields): Pipe = { asList(f) - .foldLeft(pipe) { (p, fld) => - p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { - Bytes.toString(from.get) - } - } - } - } + .foldLeft(pipe) { (p, fld) => { + p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => + Option(from).map(x => Bytes.toString(x.get)).getOrElse(null) + } + }} + } // def fromBytesWritable : Pipe = { // asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index eccd653..2aa5342 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -7,93 +7,101 @@ import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.util.Bytes import org.apache.log4j.Level import org.apache.log4j.Logger - import com.twitter.scalding._ import com.twitter.scalding.Args - import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import org.apache.hadoop.hbase.client.Put +import parallelai.spyglass.hbase.HBaseSalter class HBaseExample(args: Args) extends JobBase(args) { - val isDebug: Boolean = args("debug").toBoolean + val isDebug: Boolean = args("debug").toBoolean - if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) - val output = args("output") + val output = args("output") - println(output) + val jobConf = getJobConf() - val jobConf = getJobConf() + val quorumNames = args("quorum") - val quorumNames = args("quorum") + println("Output : " + output) + println("Quorum : " + quorumNames) - case class HBaseTableStore( + case class HBaseTableStore( conf: Configuration, quorum: String, tableName: String) { - val tableBytes = Bytes.toBytes(tableName) - val connection = HConnectionManager.getConnection(conf) - val maxThreads = conf.getInt("hbase.htable.threads.max", 1) - - conf.set("hbase.zookeeper.quorum", quorumNames) - - val htable = new HTable(HBaseConfiguration.create(conf), tableName) - - } - - val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet") - - val hbs2 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) - .read - .write(Tsv(output.format("get_list"))) - - val hbs3 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") - .read - .write(Tsv(output.format("scan_all"))) - - val hbs4 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") - .read - .write(Tsv(output.format("scan_range_to_end"))) - - val hbs5 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") - .read - .write(Tsv(output.format("scan_range_from_start"))) - - val hbs6 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") - .read - .write(Tsv(output.format("scan_range_between"))) - -}
\ No newline at end of file + val tableBytes = Bytes.toBytes(tableName) + val connection = HConnectionManager.getConnection(conf) + val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + + conf.set("hbase.zookeeper.quorum", quorumNames) + + val htable = new HTable(HBaseConfiguration.create(conf), tableName) + + def makeN(n: Int) { + (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => { + val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x))) + put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x)) + }) + } + + } + + HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000) + + val hbs2 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.GET_LIST, keyList = List("13914", "10687", "14897").map(x => "%015d".format(x.toLong)), useSalt = true) + .read + .write(Tsv(output.format("get_list"))) + + val hbs3 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") + .read + .write(Tsv(output.format("scan_all"))) + + val hbs4 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_RANGE, stopKey = "%015d".format("13914".toLong), useSalt = true) + .read + .write(Tsv(output.format("scan_range_to_end"))) + + val hbs5 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), useSalt = true) + .read + .write(Tsv(output.format("scan_range_from_start"))) + + val hbs6 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), stopKey = "%015d".format("16897".toLong), useSalt = true) + .read + .write(Tsv(output.format("scan_range_between"))) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala index 890d2be..920f17d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -3,36 +3,39 @@ package parallelai.spyglass.hbase.example import com.twitter.scalding.Tool import org.joda.time.format.DateTimeFormat import java.util.Formatter.DateTime +import parallelai.spyglass.base.JobRunner object HBaseExampleRunner extends App { - val appPath = System.getenv("BIGDATA_APPCONF_PATH") - assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) - println( "Application Path is [%s]".format(appPath) ) - - val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { - case "hdfs" => "--hdfs" - case _ => "--local" - }} - - println(modeString) - - val jobLibPath = modeString match { - case "--hdfs" => { - val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") - assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) - println( "Job Library Path Path is [%s]".format(jobLibPath) ) - jobLibPath - } - case _ => "" - } - - val quorum = System.getenv("BIGDATA_QUORUM_NAMES") - assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) - println( "Quorum is [%s]".format(quorum) ) - - val output = "HBaseTest.%s.tsv" - - Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, - "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum )) - + val appPath = System.getenv("BIGDATA_APPCONF_PATH") + assert(appPath != null, { "Environment Variable BIGDATA_APPCONF_PATH is undefined or Null" }) + println("Application Path is [%s]".format(appPath)) + + val modeString = if (args.length == 0) { "--hdfs" } else { + args(0) match { + case "hdfs" => "--hdfs" + case _ => "--hdfs" + } + } + + println(modeString) + + val jobLibPath = modeString match { + case "--hdfs" => { + val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") + assert(jobLibPath != null, { "Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null" }) + println("Job Library Path Path is [%s]".format(jobLibPath)) + jobLibPath + } + case _ => "" + } + + val quorum = System.getenv("BIGDATA_QUORUM_NAMES") + assert(quorum != null, { "Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null" }) + println("Quorum is [%s]".format(quorum)) + + val output = "HBaseTest.%s" + + JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath, + "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum)) + }
\ No newline at end of file |