aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-08-07 11:29:57 +0100
committerChandan Rajah <crajah@parallelai.com>2013-08-07 11:29:57 +0100
commit2af55565c99297ddbce5ec9bb29a8e650a2a968f (patch)
treee17b9fccf8ecc72e4a84810869992440dc9c16db
parent6a2492a8a0a9a977664e7bfe3e0eb048de1a74b4 (diff)
parentde38789a032b586b0e278a81dedb5c8fb6d43e02 (diff)
downloadSpyGlass-SCALA_2.9.3_2.4.0.tar.gz
SpyGlass-SCALA_2.9.3_2.4.0.zip
Merged changes from koertkuipersSCALA_2.9.3_2.4.0
-rw-r--r--LICENSE177
-rw-r--r--NOTICE18
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java1123
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java111
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java1048
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java8
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java392
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala28
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala152
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala63
11 files changed, 1819 insertions, 1303 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..66a27ec
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,177 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..6c40492
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,18 @@
+SpyGlass is a Scalding Source and Cascading Tap framework
+Copyright 2013 Parallel AI LTD
+
+Third Party Dependencies:
+
+Scalding 2.x
+Apache Public License 2.0
+https://github.com/twitter/scalding
+
+Cascading 2.0
+Apache Public License 2.0
+http://www.cascading.org
+
+Hadoop 0.20.2
+Apache Public License 2.0
+http://hadoop.apache.org
+
+
diff --git a/pom.xml b/pom.xml
index 4b22b61..8d40658 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,7 +62,7 @@
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>${scala.version}_2.3.0</version>
+ <version>${scala.version}_2.4.0</version>
<packaging>jar</packaging>
<distributionManagement>
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
index aabdc5e..8e121bc 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
@@ -6,7 +6,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
-import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -17,10 +16,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter;
@@ -40,519 +37,609 @@ import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-public class HBaseInputFormat
- implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
-
- private final Log LOG = LogFactory.getLog(HBaseInputFormat.class);
-
- private final String id = UUID.randomUUID().toString();
-
- private byte [][] inputColumns;
- private HTable table;
-// private HBaseRecordReader tableRecordReader;
- private Filter rowFilter;
-// private String tableName = "";
-
- private HashMap<InetAddress, String> reverseDNSCacheMap =
- new HashMap<InetAddress, String>();
-
- private String nameServer = null;
-
-// private Scan scan = null;
-
-
- @SuppressWarnings("deprecation")
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- if (this.table == null) {
- throw new IOException("No table was provided");
- }
-
- if (this.inputColumns == null || this.inputColumns.length == 0) {
- throw new IOException("Expecting at least one column");
- }
-
- Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
-
- if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
- HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-
- if (null == regLoc) {
- throw new IOException("Expecting at least one region.");
- }
-
- List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
- HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
- HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
- .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY, false);
-
- splits.add(split);
-
- return splits.toArray(new HBaseTableSplit[splits.size()]);
- }
-
- if( keys.getSecond() == null || keys.getSecond().length == 0) {
- throw new IOException("Expecting at least one region.");
- }
-
- if( keys.getFirst().length != keys.getSecond().length ) {
- throw new IOException("Regions for start and end key do not match");
- }
-
- byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
- byte[] maxKey = keys.getSecond()[0];
-
- LOG.debug( String.format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
-
- byte [][] regStartKeys = keys.getFirst();
- byte [][] regStopKeys = keys.getSecond();
- String [] regions = new String[regStartKeys.length];
-
- for( int i = 0; i < regStartKeys.length; i++ ) {
- minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey;
- maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey;
-
- HServerAddress regionServerAddress =
- table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
- InetAddress regionAddress =
- regionServerAddress.getInetSocketAddress().getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.error("Cannot resolve the host name for " + regionAddress +
- " because of " + e);
- regionLocation = regionServerAddress.getHostname();
- }
-
-// HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
-// InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
-//
-// String regionLocation;
-//
-// try {
-// regionLocation = reverseDNS(regionAddress);
-// } catch (NamingException e) {
-// LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e);
-// regionLocation = regionServerAddress.getHostname();
-// }
-
-// String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname();
-
- LOG.debug( "***** " + regionLocation );
-
- if( regionLocation == null || regionLocation.length() == 0 )
- throw new IOException( "The region info for regiosn " + i + " is null or empty");
-
- regions[i] = regionLocation;
-
- LOG.debug(String.format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));
- }
-
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
-
- LOG.debug( String.format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
-
- LOG.debug("SOURCE MODE is : " + sourceMode);
-
- switch( sourceMode ) {
- case SCAN_ALL:
- startRow = HConstants.EMPTY_START_ROW;
- stopRow = HConstants.EMPTY_END_ROW;
-
- LOG.info( String.format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
- break;
-
- case SCAN_RANGE:
- startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;
- stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ;
-
- LOG.info( String.format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
- break;
- }
-
- switch( sourceMode ) {
- case EMPTY:
- case SCAN_ALL:
- case SCAN_RANGE:
- {
-// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow;
-// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow;
-
- List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
-
- if( ! useSalt ) {
-
- List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow);
-
- int maxRegions = validRegions.size();
- int currentRegion = 1;
-
- for( HRegionLocation cRegion : validRegions ) {
- byte [] rStart = cRegion.getRegionInfo().getStartKey();
- byte [] rStop = cRegion.getRegionInfo().getEndKey();
-
- HServerAddress regionServerAddress = cRegion.getServerAddress();
- InetAddress regionAddress =
- regionServerAddress.getInetSocketAddress().getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.error("Cannot resolve the host name for " + regionAddress +
- " because of " + e);
- regionLocation = regionServerAddress.getHostname();
- }
-
- byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow);
- byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);
-
- LOG.debug(String.format("BOOL start (%s) stop (%s) length (%d)",
- (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )),
- (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )),
- rStop.length
- ));
-
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(),
- sStart,
- sStop,
- regionLocation,
- SourceMode.SCAN_RANGE, useSalt
- );
-
- split.setEndRowInclusive( currentRegion == maxRegions );
-
- currentRegion ++;
-
- LOG.debug(String.format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
- Bytes.toString(startRow), Bytes.toString(stopRow),
- Bytes.toString(rStart), Bytes.toString(rStop),
- Bytes.toString(sStart),
- Bytes.toString(sStop),
- cRegion.getHostnamePort(), split) );
-
- splits.add(split);
- }
- } else {
- LOG.debug("Using SALT : " + useSalt );
-
- // Will return the start and the stop key with all possible prefixes.
- for( int i = 0; i < regions.length; i++ ) {
- Pair<byte[], byte[]>[] intervals = HBaseSalter.getDistributedIntervals(startRow, stopRow, regStartKeys[i], regStopKeys[i], prefixList );
-
- for( Pair<byte[], byte[]> pair : intervals ) {
- LOG.info("".format("Using SALT, Region (%s) Start (%s) Stop (%s)", regions[i], Bytes.toString(pair.getFirst()), Bytes.toString(pair.getSecond())));
-
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(),
- pair.getFirst(),
- pair.getSecond(),
- regions[i],
- SourceMode.SCAN_RANGE, useSalt
- );
-
- split.setEndRowInclusive(true);
- splits.add(split);
- }
- }
- }
-
- LOG.info("RETURNED NO OF SPLITS: split -> " + splits.size());
- for( HBaseTableSplit s: splits) {
- LOG.info("RETURNED SPLITS: split -> " + s);
- }
-
- return splits.toArray(new HBaseTableSplit[splits.size()]);
- }
-
- case GET_LIST:
- {
-// if( keyList == null || keyList.size() == 0 ) {
- if( keyList == null ) {
- throw new IOException("Source Mode is GET_LIST but key list is EMPTY");
- }
-
- if( useSalt ) {
- TreeSet<String> tempKeyList = new TreeSet<String>();
-
- for(String key: keyList) {
- tempKeyList.add(HBaseSalter.addSaltPrefix(key));
- }
-
- keyList = tempKeyList;
- }
-
- LOG.debug("".format("Splitting Key List (%s)", keyList));
-
- List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
-
- for (int i = 0; i < keys.getFirst().length; i++) {
-
- if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
- continue;
- }
-
- LOG.debug(String.format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));
-
- Set<String> regionsSubSet = null;
-
- if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) {
- LOG.debug("REGION start is empty");
- LOG.debug("REGION stop is empty");
- regionsSubSet = keyList;
- } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) {
- LOG.debug("REGION start is empty");
- regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);
- } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) {
- LOG.debug("REGION stop is empty");
- regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);
- } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {
- regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);
- } else {
- throw new IOException(String.format("For REGION (%s) Start Key (%s) > Stop Key(%s)",
- regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
- }
-
- if( regionsSubSet == null || regionsSubSet.size() == 0) {
- LOG.debug( "EMPTY: Key is for region " + regions[i] + " is null");
-
- continue;
- }
-
- TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet);
-
- LOG.debug(String.format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));
-
- HBaseTableSplit split = new HBaseTableSplit(
- table.getTableName(), regionKeyList, versions,
- regions[i],
- SourceMode.GET_LIST, useSalt);
- splits.add(split);
- }
-
- LOG.debug("RETURNED SPLITS: split -> " + splits);
-
- return splits.toArray(new HBaseTableSplit[splits.size()]);
- }
-
- default:
- throw new IOException("Unknown source Mode : " + sourceMode );
- }
- }
-
- private String reverseDNS(InetAddress ipAddress) throws NamingException {
- String hostName = this.reverseDNSCacheMap.get(ipAddress);
- if (hostName == null) {
- hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
- this.reverseDNSCacheMap.put(ipAddress, hostName);
- }
- return hostName;
- }
-
-
- @Override
- public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
-
- if( ! (split instanceof HBaseTableSplit ) )
- throw new IOException("Table Split is not type HBaseTableSplit");
-
- HBaseTableSplit tSplit = (HBaseTableSplit) split;
-
- HBaseRecordReader trr = new HBaseRecordReader();
-
- switch( tSplit.getSourceMode() ) {
- case SCAN_ALL:
- case SCAN_RANGE:
- {
- LOG.debug(String.format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));
-
- trr.setStartRow(tSplit.getStartRow());
- trr.setEndRow(tSplit.getEndRow());
- trr.setEndRowInclusive(tSplit.getEndRowInclusive());
- trr.setUseSalt(useSalt);
- }
-
- break;
-
- case GET_LIST:
- {
- LOG.debug(String.format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));
-
- trr.setKeyList(tSplit.getKeyList());
- trr.setVersions(tSplit.getVersions());
- trr.setUseSalt(useSalt);
- }
-
- break;
-
- default:
- throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() );
- }
-
- trr.setSourceMode(tSplit.getSourceMode());
- trr.setHTable(this.table);
- trr.setInputColumns(this.inputColumns);
- trr.setRowFilter(this.rowFilter);
-
- trr.init();
-
- return trr;
- }
-
-
-
- /* Configuration Section */
-
- /**
- * space delimited list of columns
- */
- public static final String COLUMN_LIST = "hbase.tablecolumns";
-
- /**
- * Use this jobconf param to specify the input table
- */
- private static final String INPUT_TABLE = "hbase.inputtable";
-
- private String startKey = null;
- private String stopKey = null;
-
- private SourceMode sourceMode = SourceMode.EMPTY;
- private TreeSet<String> keyList = null;
- private int versions = 1;
- private boolean useSalt = false;
- private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
-
- public void configure(JobConf job) {
- String tableName = getTableName(job);
- String colArg = job.get(COLUMN_LIST);
- String[] colNames = colArg.split(" ");
- byte [][] m_cols = new byte[colNames.length][];
- for (int i = 0; i < m_cols.length; i++) {
- m_cols[i] = Bytes.toBytes(colNames[i]);
- }
- setInputColumns(m_cols);
-
- try {
- setHTable(new HTable(HBaseConfiguration.create(job), tableName));
- } catch (Exception e) {
- LOG.error( "************* Table could not be created" );
- LOG.error(StringUtils.stringifyException(e));
- }
-
- LOG.debug("Entered : " + this.getClass() + " : configure()" );
-
- useSalt = job.getBoolean( String.format(HBaseConstants.USE_SALT, getTableName(job) ), false);
- prefixList = job.get( String.format(HBaseConstants.SALT_PREFIX, getTableName(job) ), HBaseSalter.DEFAULT_PREFIX_LIST);
-
- sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ;
-
- LOG.info( String.format("GOT SOURCE MODE (%s) as (%s) and finally",
- String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));
-
- switch( sourceMode ) {
- case SCAN_RANGE:
- LOG.info("HIT SCAN_RANGE");
-
- startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) );
- stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) );
-
- LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) );
- break;
-
- case GET_LIST:
- LOG.info("HIT GET_LIST");
-
- Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));
- keyList = new TreeSet<String> (keys);
-
- versions = job.getInt(String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
-
- LOG.debug( "GOT KEY LIST : " + keys );
- LOG.debug(String.format("SETTING key list (%s)", keyList) );
-
- break;
-
- case EMPTY:
- LOG.info("HIT EMPTY");
-
- sourceMode = SourceMode.SCAN_ALL;
- break;
-
- default:
- LOG.info("HIT DEFAULT");
-
- break;
- }
- }
-
- public void validateInput(JobConf job) throws IOException {
- // expecting exactly one path
- String tableName = getTableName(job);
-
- if (tableName == null) {
- throw new IOException("expecting one table name");
- }
- LOG.debug(String.format("Found Table name [%s]", tableName));
-
-
- // connected to table?
- if (getHTable() == null) {
- throw new IOException("could not connect to table '" +
- tableName + "'");
- }
- LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
-
- // expecting at least one column
- String colArg = job.get(COLUMN_LIST);
- if (colArg == null || colArg.length() == 0) {
- throw new IOException("expecting at least one column");
- }
- LOG.debug(String.format("Found Columns [%s]", colArg));
-
- LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey, stopKey));
-
- if( sourceMode == SourceMode.EMPTY ) {
- throw new IOException("SourceMode should not be EMPTY");
- }
-
- if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) {
- throw new IOException( "Source mode is GET_LIST bu key list is empty");
- }
- }
-
-
- /* Getters & Setters */
- private HTable getHTable() { return this.table; }
- private void setHTable(HTable ht) { this.table = ht; }
- private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; }
-
-
- private void setJobProp( JobConf job, String key, String value) {
- if( job.get(key) != null ) throw new RuntimeException(String.format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));
- job.set(key, value);
- }
-
- private String getJobProp( JobConf job, String key ) { return job.get(key); }
-
- public static void setTableName(JobConf job, String tableName) {
- // Make sure that table has not been set before
- String oldTableName = getTableName(job);
- if(oldTableName != null) {
- throw new RuntimeException("table name already set to: '"
- + oldTableName + "'");
- }
-
- job.set(INPUT_TABLE, tableName);
- }
-
- public static String getTableName(JobConf job) {
- return job.get(INPUT_TABLE);
- }
-
- protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
- return true;
- }
+public class HBaseInputFormat implements
+ InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
+
+ private final Log LOG = LogFactory.getLog(HBaseInputFormat.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ private byte[][] inputColumns;
+ private HTable table;
+ // private HBaseRecordReader tableRecordReader;
+ private Filter rowFilter;
+ // private String tableName = "";
+
+ private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();
+
+ private String nameServer = null;
+
+ // private Scan scan = null;
+
+ private HBaseMultiInputSplit[] convertToMultiSplitArray(
+ List<HBaseTableSplit> splits) throws IOException {
+
+ if (splits == null)
+ throw new IOException("The list of splits is null => " + splits);
+
+ HashMap<String, HBaseMultiInputSplit> regionSplits = new HashMap<String, HBaseMultiInputSplit>();
+
+ for (HBaseTableSplit hbt : splits) {
+ HBaseMultiInputSplit mis = null;
+ if (regionSplits.containsKey(hbt.getRegionLocation())) {
+ mis = regionSplits.get(hbt.getRegionLocation());
+ } else {
+ regionSplits.put(hbt.getRegionLocation(), new HBaseMultiInputSplit(
+ hbt.getRegionLocation()));
+ mis = regionSplits.get(hbt.getRegionLocation());
+ }
+
+ mis.addSplit(hbt);
+ regionSplits.put(hbt.getRegionLocation(), mis);
+ }
+
+ Collection<HBaseMultiInputSplit> outVals = regionSplits.values();
+
+ LOG.debug("".format("Returning array of splits : %s", outVals));
+
+ if (outVals == null)
+ throw new IOException("The list of multi input splits were null");
+
+ return outVals.toArray(new HBaseMultiInputSplit[outVals.size()]);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ if (this.table == null) {
+ throw new IOException("No table was provided");
+ }
+
+ if (this.inputColumns == null || this.inputColumns.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+
+ final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+
+ if (keys == null || keys.getFirst() == null
+ || keys.getFirst().length == 0) {
+ HRegionLocation regLoc = table.getRegionLocation(
+ HConstants.EMPTY_BYTE_ARRAY, false);
+
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+ HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+ .getHostnamePort().split(
+ Addressing.HOSTNAME_PORT_SEPARATOR)[0],
+ SourceMode.EMPTY, false);
+
+ splits.add(split);
+
+ // TODO: Change to HBaseMultiSplit
+ return convertToMultiSplitArray(splits);
+ }
+
+ if (keys.getSecond() == null || keys.getSecond().length == 0) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ if (keys.getFirst().length != keys.getSecond().length) {
+ throw new IOException("Regions for start and end key do not match");
+ }
+
+ byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
+ byte[] maxKey = keys.getSecond()[0];
+
+ LOG.debug(String.format("SETTING min key (%s) and max key (%s)",
+ Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+ byte[][] regStartKeys = keys.getFirst();
+ byte[][] regStopKeys = keys.getSecond();
+ String[] regions = new String[regStartKeys.length];
+
+ for (int i = 0; i < regStartKeys.length; i++) {
+ minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0)
+ && (Bytes.compareTo(regStartKeys[i], minKey) < 0) ? regStartKeys[i]
+ : minKey;
+ maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0)
+ && (Bytes.compareTo(regStopKeys[i], maxKey) > 0) ? regStopKeys[i]
+ : maxKey;
+
+ HServerAddress regionServerAddress = table.getRegionLocation(
+ keys.getFirst()[i]).getServerAddress();
+ InetAddress regionAddress = regionServerAddress.getInetSocketAddress()
+ .getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress
+ + " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ // HServerAddress regionServerAddress =
+ // table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+ // InetAddress regionAddress =
+ // regionServerAddress.getInetSocketAddress().getAddress();
+ //
+ // String regionLocation;
+ //
+ // try {
+ // regionLocation = reverseDNS(regionAddress);
+ // } catch (NamingException e) {
+ // LOG.error("Cannot resolve the host name for " + regionAddress +
+ // " because of " + e);
+ // regionLocation = regionServerAddress.getHostname();
+ // }
+
+ // String regionLocation =
+ // table.getRegionLocation(keys.getFirst()[i]).getHostname();
+
+ LOG.debug("***** " + regionLocation);
+
+ if (regionLocation == null || regionLocation.length() == 0)
+ throw new IOException("The region info for regiosn " + i
+ + " is null or empty");
+
+ regions[i] = regionLocation;
+
+ LOG.debug(String.format(
+ "Region (%s) has start key (%s) and stop key (%s)", regions[i],
+ Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
+ }
+
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+ LOG.debug(String.format("Found min key (%s) and max key (%s)",
+ Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+ LOG.debug("SOURCE MODE is : " + sourceMode);
+
+ switch (sourceMode) {
+ case SCAN_ALL:
+ startRow = HConstants.EMPTY_START_ROW;
+ stopRow = HConstants.EMPTY_END_ROW;
+
+ LOG.info(String.format(
+ "SCAN ALL: Found start key (%s) and stop key (%s)",
+ Bytes.toString(startRow), Bytes.toString(stopRow)));
+ break;
+
+ case SCAN_RANGE:
+ startRow = (startKey != null && startKey.length() != 0) ? Bytes
+ .toBytes(startKey) : HConstants.EMPTY_START_ROW;
+ stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes
+ .toBytes(stopKey) : HConstants.EMPTY_END_ROW;
+
+ LOG.info(String.format(
+ "SCAN RANGE: Found start key (%s) and stop key (%s)",
+ Bytes.toString(startRow), Bytes.toString(stopRow)));
+ break;
+ }
+
+ switch (sourceMode) {
+ case EMPTY:
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+ // startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey :
+ // startRow;
+ // stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey :
+ // stopRow;
+
+ final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ if (!useSalt) {
+
+ List<HRegionLocation> validRegions = table.getRegionsInRange(
+ startRow, stopRow);
+
+ int maxRegions = validRegions.size();
+ int currentRegion = 1;
+
+ for (HRegionLocation cRegion : validRegions) {
+ byte[] rStart = cRegion.getRegionInfo().getStartKey();
+ byte[] rStop = cRegion.getRegionInfo().getEndKey();
+
+ HServerAddress regionServerAddress = cRegion
+ .getServerAddress();
+ InetAddress regionAddress = regionServerAddress
+ .getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for "
+ + regionAddress + " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
+ || (Bytes.compareTo(startRow, rStart) <= 0) ? rStart
+ : startRow);
+ byte[] sStop = (stopRow == HConstants.EMPTY_END_ROW
+ || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop
+ : stopRow);
+
+ LOG.debug(String.format(
+ "BOOL start (%s) stop (%s) length (%d)",
+ (startRow == HConstants.EMPTY_START_ROW || (Bytes
+ .compareTo(startRow, rStart) <= 0)),
+ (stopRow == HConstants.EMPTY_END_ROW || (Bytes
+ .compareTo(stopRow, rStop) >= 0)), rStop.length));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), sStart, sStop, regionLocation,
+ SourceMode.SCAN_RANGE, useSalt);
+
+ split.setEndRowInclusive(currentRegion == maxRegions);
+
+ currentRegion++;
+
+ LOG.debug(String
+ .format(
+ "START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
+ Bytes.toString(startRow),
+ Bytes.toString(stopRow), Bytes.toString(rStart),
+ Bytes.toString(rStop), Bytes.toString(sStart),
+ Bytes.toString(sStop), cRegion.getHostnamePort(),
+ split));
+
+ splits.add(split);
+ }
+ } else {
+ LOG.debug("Using SALT : " + useSalt);
+
+ // Will return the start and the stop key with all possible
+ // prefixes.
+ for (int i = 0; i < regions.length; i++) {
+ Pair<byte[], byte[]>[] intervals = HBaseSalter
+ .getDistributedIntervals(startRow, stopRow,
+ regStartKeys[i], regStopKeys[i], prefixList);
+
+ for (Pair<byte[], byte[]> pair : intervals) {
+ LOG.debug("".format(
+ "Using SALT, Region (%s) Start (%s) Stop (%s)",
+ regions[i], Bytes.toString(pair.getFirst()),
+ Bytes.toString(pair.getSecond())));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), pair.getFirst(),
+ pair.getSecond(), regions[i], SourceMode.SCAN_RANGE,
+ useSalt);
+
+ split.setEndRowInclusive(true);
+ splits.add(split);
+ }
+ }
+ }
+
+ LOG.debug("RETURNED NO OF SPLITS: split -> " + splits.size());
+
+ // TODO: Change to HBaseMultiSplit
+ return convertToMultiSplitArray(splits);
+ }
+
+ case GET_LIST: {
+ // if( keyList == null || keyList.size() == 0 ) {
+ if (keyList == null) {
+ throw new IOException(
+ "Source Mode is GET_LIST but key list is EMPTY");
+ }
+
+ if (useSalt) {
+ TreeSet<String> tempKeyList = new TreeSet<String>();
+
+ for (String key : keyList) {
+ tempKeyList.add(HBaseSalter.addSaltPrefix(key));
+ }
+
+ keyList = tempKeyList;
+ }
+
+ LOG.info("".format("Splitting Key List (%s)", keyList));
+
+ final List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ for (int i = 0; i < keys.getFirst().length; i++) {
+
+ if (!includeRegionInSplit(keys.getFirst()[i],
+ keys.getSecond()[i])) {
+ continue;
+ }
+
+ LOG.debug(String.format(
+ "Getting region (%s) subset (%s) to (%s)", regions[i],
+ Bytes.toString(regStartKeys[i]),
+ Bytes.toString(regStopKeys[i])));
+
+ Set<String> regionsSubSet = null;
+
+ if ((regStartKeys[i] == null || regStartKeys[i].length == 0)
+ && (regStopKeys[i] == null || regStopKeys[i].length == 0)) {
+ LOG.debug("REGION start is empty");
+ LOG.debug("REGION stop is empty");
+ regionsSubSet = keyList;
+ } else if (regStartKeys[i] == null
+ || regStartKeys[i].length == 0) {
+ LOG.debug("REGION start is empty");
+ regionsSubSet = keyList.headSet(
+ Bytes.toString(regStopKeys[i]), true);
+ } else if (regStopKeys[i] == null || regStopKeys[i].length == 0) {
+ LOG.debug("REGION stop is empty");
+ regionsSubSet = keyList.tailSet(
+ Bytes.toString(regStartKeys[i]), true);
+ } else if (Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0) {
+ regionsSubSet = keyList.subSet(
+ Bytes.toString(regStartKeys[i]), true,
+ Bytes.toString(regStopKeys[i]), true);
+ } else {
+ throw new IOException(String.format(
+ "For REGION (%s) Start Key (%s) > Stop Key(%s)",
+ regions[i], Bytes.toString(regStartKeys[i]),
+ Bytes.toString(regStopKeys[i])));
+ }
+
+ if (regionsSubSet == null || regionsSubSet.size() == 0) {
+ LOG.debug("EMPTY: Key is for region " + regions[i]
+ + " is null");
+
+ continue;
+ }
+
+ TreeSet<String> regionKeyList = new TreeSet<String>(
+ regionsSubSet);
+
+ LOG.debug(String.format("Regions [%s] has key list <%s>",
+ regions[i], regionKeyList));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), regionKeyList, versions, regions[i],
+ SourceMode.GET_LIST, useSalt);
+ splits.add(split);
+ }
+
+ // if (splits.isEmpty()) {
+ // LOG.info("GOT EMPTY SPLITS");
+
+ // throw new IOException(
+ // "".format("Key List NOT found in any region"));
+
+ // HRegionLocation regLoc = table.getRegionLocation(
+ // HConstants.EMPTY_BYTE_ARRAY, false);
+ //
+ // if (null == regLoc) {
+ // throw new IOException("Expecting at least one region.");
+ // }
+ //
+ // HBaseTableSplit split = new HBaseTableSplit(
+ // table.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
+ // HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostnamePort()
+ // .split(Addressing.HOSTNAME_PORT_SEPARATOR)[0],
+ // SourceMode.EMPTY, false);
+ //
+ // splits.add(split);
+ // }
+
+ LOG.info("RETURNED SPLITS: split -> " + splits);
+
+ // TODO: Change to HBaseMultiSplit
+ return convertToMultiSplitArray(splits);
+ }
+
+ default:
+ throw new IOException("Unknown source Mode : " + sourceMode);
+ }
+ }
+
+ private String reverseDNS(InetAddress ipAddress) throws NamingException {
+ String hostName = this.reverseDNSCacheMap.get(ipAddress);
+ if (hostName == null) {
+ hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(
+ ipAddress, this.nameServer));
+ this.reverseDNSCacheMap.put(ipAddress, hostName);
+ }
+ return hostName;
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ if (!(split instanceof HBaseMultiInputSplit))
+ throw new IOException("Table Split is not type HBaseMultiInputSplit");
+
+ HBaseMultiInputSplit tSplit = (HBaseMultiInputSplit) split;
+
+ HBaseRecordReader trr = new HBaseRecordReader(tSplit);
+
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.setUseSalt(useSalt);
+
+ trr.setNextSplit();
+
+ return trr;
+ }
+
+ /* Configuration Section */
+
+ /**
+ * space delimited list of columns
+ */
+ public static final String COLUMN_LIST = "hbase.tablecolumns";
+
+ /**
+ * Use this jobconf param to specify the input table
+ */
+ private static final String INPUT_TABLE = "hbase.inputtable";
+
+ private String startKey = null;
+ private String stopKey = null;
+
+ private SourceMode sourceMode = SourceMode.EMPTY;
+ private TreeSet<String> keyList = null;
+ private int versions = 1;
+ private boolean useSalt = false;
+ private String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
+
+ public void configure(JobConf job) {
+ String tableName = getTableName(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColumns(m_cols);
+
+ try {
+ setHTable(new HTable(HBaseConfiguration.create(job), tableName));
+ } catch (Exception e) {
+ LOG.error("************* Table could not be created");
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ LOG.debug("Entered : " + this.getClass() + " : configure()");
+
+ useSalt = job.getBoolean(
+ String.format(HBaseConstants.USE_SALT, getTableName(job)), false);
+ prefixList = job.get(
+ String.format(HBaseConstants.SALT_PREFIX, getTableName(job)),
+ HBaseSalter.DEFAULT_PREFIX_LIST);
+
+ sourceMode = SourceMode.valueOf(job.get(String.format(
+ HBaseConstants.SOURCE_MODE, getTableName(job))));
+
+ LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
+ .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
+ .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
+ sourceMode));
+
+ switch (sourceMode) {
+ case SCAN_RANGE:
+ LOG.info("HIT SCAN_RANGE");
+
+ startKey = getJobProp(job,
+ String.format(HBaseConstants.START_KEY, getTableName(job)));
+ stopKey = getJobProp(job,
+ String.format(HBaseConstants.STOP_KEY, getTableName(job)));
+
+ LOG.info(String.format("Setting start key (%s) and stop key (%s)",
+ startKey, stopKey));
+ break;
+
+ case GET_LIST:
+ LOG.info("HIT GET_LIST");
+
+ Collection<String> keys = job.getStringCollection(String.format(
+ HBaseConstants.KEY_LIST, getTableName(job)));
+ keyList = new TreeSet<String>(keys);
+
+ versions = job.getInt(
+ String.format(HBaseConstants.VERSIONS, getTableName(job)), 1);
+
+ LOG.debug("GOT KEY LIST : " + keys);
+ LOG.debug(String.format("SETTING key list (%s)", keyList));
+
+ break;
+
+ case EMPTY:
+ LOG.info("HIT EMPTY");
+
+ sourceMode = SourceMode.SCAN_ALL;
+ break;
+
+ default:
+ LOG.info("HIT DEFAULT");
+
+ break;
+ }
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ String tableName = getTableName(job);
+
+ if (tableName == null) {
+ throw new IOException("expecting one table name");
+ }
+ LOG.debug(String.format("Found Table name [%s]", tableName));
+
+ // connected to table?
+ if (getHTable() == null) {
+ throw new IOException("could not connect to table '" + tableName + "'");
+ }
+ LOG.debug(String.format("Found Table [%s]", getHTable().getTableName()));
+
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ LOG.debug(String.format("Found Columns [%s]", colArg));
+
+ LOG.debug(String.format("Found Start & STop Key [%s][%s]", startKey,
+ stopKey));
+
+ if (sourceMode == SourceMode.EMPTY) {
+ throw new IOException("SourceMode should not be EMPTY");
+ }
+
+ if (sourceMode == SourceMode.GET_LIST
+ && (keyList == null || keyList.size() == 0)) {
+ throw new IOException("Source mode is GET_LIST bu key list is empty");
+ }
+ }
+
+ /* Getters & Setters */
+ private HTable getHTable() {
+ return this.table;
+ }
+
+ private void setHTable(HTable ht) {
+ this.table = ht;
+ }
+
+ private void setInputColumns(byte[][] ic) {
+ this.inputColumns = ic;
+ }
+
+ private void setJobProp(JobConf job, String key, String value) {
+ if (job.get(key) != null)
+ throw new RuntimeException(String.format(
+ "Job Conf already has key [%s] with value [%s]", key,
+ job.get(key)));
+ job.set(key, value);
+ }
+
+ private String getJobProp(JobConf job, String key) {
+ return job.get(key);
+ }
+
+ public static void setTableName(JobConf job, String tableName) {
+ // Make sure that table has not been set before
+ String oldTableName = getTableName(job);
+ if (oldTableName != null) {
+ throw new RuntimeException("table name already set to: '"
+ + oldTableName + "'");
+ }
+
+ job.set(INPUT_TABLE, tableName);
+ }
+
+ public static String getTableName(JobConf job) {
+ return job.get(INPUT_TABLE);
+ }
+
+ protected boolean includeRegionInSplit(final byte[] startKey,
+ final byte[] endKey) {
+ return true;
+ }
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java
new file mode 100644
index 0000000..02e7f7b
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseMultiInputSplit.java
@@ -0,0 +1,111 @@
+package parallelai.spyglass.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+public class HBaseMultiInputSplit implements InputSplit,
+ Comparable<HBaseMultiInputSplit>, Serializable {
+
+ private final Log LOG = LogFactory.getLog(HBaseMultiInputSplit.class);
+
+ private List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ private String regionLocation = null;
+
+ /** default constructor */
+ private HBaseMultiInputSplit() {
+
+ }
+
+ public HBaseMultiInputSplit(String regionLocation) {
+ this.regionLocation = regionLocation;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ LOG.debug("REGION GETTER : " + regionLocation);
+
+ return this.regionLocation;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ LOG.debug("READ ME : " + in.toString());
+
+ int s = Bytes.toInt(Bytes.readByteArray(in));
+
+ for (int i = 0; i < s; i++) {
+ HBaseTableSplit hbts = (HBaseTableSplit) SerializationUtils
+ .deserialize(Bytes.readByteArray(in));
+ splits.add(hbts);
+ }
+
+ LOG.debug("READ and CREATED : " + this);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ LOG.debug("WRITE : " + this);
+
+ Bytes.writeByteArray(out, Bytes.toBytes(splits.size()));
+
+ for (HBaseTableSplit hbts : splits) {
+ Bytes.writeByteArray(out, SerializationUtils.serialize(hbts));
+ }
+
+ LOG.debug("WROTE : " + out.toString());
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer str = new StringBuffer();
+ str.append("HBaseMultiSplit : ");
+
+ for (HBaseTableSplit hbt : splits) {
+ str.append(" [" + hbt.toString() + "]");
+ }
+
+ return str.toString();
+ }
+
+ @Override
+ public int compareTo(HBaseMultiInputSplit o) {
+ // TODO: Make this comparison better
+ return (splits.size() - o.splits.size());
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return splits.size();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ LOG.debug("REGION ARRAY : " + regionLocation);
+
+ return new String[] { this.regionLocation };
+ }
+
+ public void addSplit(HBaseTableSplit hbt) throws IOException {
+ if (hbt.getRegionLocation().equals(regionLocation))
+ splits.add(hbt);
+ else
+ throw new IOException("HBaseTableSplit Region Location "
+ + hbt.getRegionLocation()
+ + " does NOT match MultiSplit Region Location " + regionLocation);
+ }
+
+ public List<HBaseTableSplit> getSplits() {
+ return splits;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
index d22ed71..5d7dbdd 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
@@ -31,475 +31,579 @@ import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
public class HBaseRecordReader implements
- RecordReader<ImmutableBytesWritable, Result> {
-
- static final Log LOG = LogFactory.getLog(HBaseRecordReader.class);
-
- private byte[] startRow;
- private byte[] endRow;
- private byte[] lastSuccessfulRow;
- private TreeSet<String> keyList;
- private SourceMode sourceMode;
- private Filter trrRowFilter;
- private ResultScanner scanner;
- private HTable htable;
- private byte[][] trrInputColumns;
- private long timestamp;
- private int rowcount;
- private boolean logScannerActivity = false;
- private int logPerRowCount = 100;
- private boolean endRowInclusive = true;
- private int versions = 1;
- private boolean useSalt = false;
-
- /**
- * Restart from survivable exceptions by creating a new scanner.
- *
- * @param firstRow
- * @throws IOException
- */
- public void restartRangeScan(byte[] firstRow) throws IOException {
- Scan currentScan;
- if ((endRow != null) && (endRow.length > 0)) {
- if (trrRowFilter != null) {
- Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
- new byte[] { 0 }) : endRow));
-
- TableInputFormat.addColumns(scan, trrInputColumns);
- scan.setFilter(trrRowFilter);
- scan.setCacheBlocks(false);
- this.scanner = this.htable.getScanner(scan);
- currentScan = scan;
- } else {
- LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
- + ", endRow: " + Bytes.toString(endRow));
- Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
- new byte[] { 0 }) : endRow));
- TableInputFormat.addColumns(scan, trrInputColumns);
- this.scanner = this.htable.getScanner(scan);
- currentScan = scan;
- }
- } else {
- LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
- + ", no endRow");
-
- Scan scan = new Scan(firstRow);
- TableInputFormat.addColumns(scan, trrInputColumns);
- scan.setFilter(trrRowFilter);
- this.scanner = this.htable.getScanner(scan);
- currentScan = scan;
- }
- if (logScannerActivity) {
- LOG.debug("Current scan=" + currentScan.toString());
- timestamp = System.currentTimeMillis();
- rowcount = 0;
- }
- }
-
- public TreeSet<String> getKeyList() {
- return keyList;
- }
-
- public void setKeyList(TreeSet<String> keyList) {
- this.keyList = keyList;
- }
-
- public void setVersions(int versions) {
- this.versions = versions;
- }
-
- public void setUseSalt(boolean useSalt) {
- this.useSalt = useSalt;
- }
-
- public SourceMode getSourceMode() {
- return sourceMode;
- }
-
- public void setSourceMode(SourceMode sourceMode) {
- this.sourceMode = sourceMode;
- }
-
- public byte[] getEndRow() {
- return endRow;
- }
-
- public void setEndRowInclusive(boolean isInclusive) {
- endRowInclusive = isInclusive;
- }
-
- public boolean getEndRowInclusive() {
- return endRowInclusive;
- }
-
- private byte[] nextKey = null;
- private Vector<List<KeyValue>> resultVector = null;
- Map<Long, List<KeyValue>> keyValueMap = null;
-
- /**
- * Build the scanner. Not done in constructor to allow for extension.
- *
- * @throws IOException
- */
- public void init() throws IOException {
- switch (sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- restartRangeScan(startRow);
- break;
-
- case GET_LIST:
- nextKey = Bytes.toBytes(keyList.pollFirst());
- break;
-
- default:
- throw new IOException(" Unknown source mode : " + sourceMode);
- }
- }
-
- byte[] getStartRow() {
- return this.startRow;
- }
-
- /**
- * @param htable
- * the {@link HTable} to scan.
- */
- public void setHTable(HTable htable) {
- Configuration conf = htable.getConfiguration();
- logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
- false);
- logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
- this.htable = htable;
- }
-
- /**
- * @param inputColumns
- * the columns to be placed in {@link Result}.
- */
- public void setInputColumns(final byte[][] inputColumns) {
- this.trrInputColumns = inputColumns;
- }
-
- /**
- * @param startRow
- * the first row in the split
- */
- public void setStartRow(final byte[] startRow) {
- this.startRow = startRow;
- }
-
- /**
- *
- * @param endRow
- * the last row in the split
- */
- public void setEndRow(final byte[] endRow) {
- this.endRow = endRow;
- }
-
- /**
- * @param rowFilter
- * the {@link Filter} to be used.
- */
- public void setRowFilter(Filter rowFilter) {
- this.trrRowFilter = rowFilter;
- }
-
- @Override
- public void close() {
- if (this.scanner != null)
- this.scanner.close();
- }
-
- /**
- * @return ImmutableBytesWritable
- *
- * @see org.apache.hadoop.mapred.RecordReader#createKey()
- */
- @Override
- public ImmutableBytesWritable createKey() {
- return new ImmutableBytesWritable();
- }
-
- /**
- * @return RowResult
- *
- * @see org.apache.hadoop.mapred.RecordReader#createValue()
- */
- @Override
- public Result createValue() {
- return new Result();
- }
-
- @Override
- public long getPos() {
- // This should be the ordinal tuple in the range;
- // not clear how to calculate...
- return 0;
- }
-
- @Override
- public float getProgress() {
- // Depends on the total number of tuples and getPos
- return 0;
- }
-
- /**
- * @param key
- * HStoreKey as input key.
- * @param value
- * MapWritable as input value
- * @return true if there was more data
- * @throws IOException
- */
- @Override
- public boolean next(ImmutableBytesWritable key, Result value)
- throws IOException {
-
- switch (sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE: {
-
- Result result;
- try {
- try {
- result = this.scanner.next();
- if (logScannerActivity) {
- rowcount++;
- if (rowcount >= logPerRowCount) {
- long now = System.currentTimeMillis();
- LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
- + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
- }
- } catch (IOException e) {
- // try to handle all IOExceptions by restarting
- // the scanner, if the second call fails, it will be rethrown
- LOG.debug("recovered from " + StringUtils.stringifyException(e));
- if (lastSuccessfulRow == null) {
- LOG.warn("We are restarting the first next() invocation,"
- + " if your mapper has restarted a few other times like this"
- + " then you should consider killing this job and investigate"
- + " why it's taking so long.");
- }
- if (lastSuccessfulRow == null) {
- restartRangeScan(startRow);
- } else {
- restartRangeScan(lastSuccessfulRow);
- this.scanner.next(); // skip presumed already mapped row
- }
- result = this.scanner.next();
- }
-
- if (result != null && result.size() > 0) {
- if( useSalt) {
- key.set( HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
-
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
- }
- return false;
- } catch (IOException ioe) {
- if (logScannerActivity) {
- long now = System.currentTimeMillis();
- LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
- + rowcount + " rows");
- LOG.debug(ioe);
- String lastRow = lastSuccessfulRow == null ? "null" : Bytes
- .toStringBinary(lastSuccessfulRow);
- LOG.debug("lastSuccessfulRow=" + lastRow);
- }
- throw ioe;
- }
- }
-
- case GET_LIST: {
- LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey)));
-
- if (versions == 1) {
- if (nextKey != null) {
- LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey)));
-
- Get theGet = new Get(nextKey);
- theGet.setMaxVersions(versions);
-
- Result result = this.htable.get(theGet);
-
- if (result != null && (! result.isEmpty()) ) {
- LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) );
-
- if (keyList != null || !keyList.isEmpty()) {
- String newKey = keyList.pollFirst();
- LOG.debug("New Key => " + newKey);
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
- .toBytes(newKey);
- } else {
- nextKey = null;
- }
-
- LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey)));
-
- // Write the result
- if( useSalt) {
- key.set( HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
-
- return true;
- } else {
- LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0
-
- String newKey;
- while((newKey = keyList.pollFirst()) != null) {
- LOG.debug("WHILE NEXT Key => " + newKey);
-
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
- .toBytes(newKey);
-
- if( nextKey == null ) {
- LOG.error("BOMB! BOMB! BOMB!");
- continue;
- }
-
- if( ! this.htable.exists( new Get(nextKey) ) ) {
- LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
- continue;
- } else { break; }
- }
-
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
- .toBytes(newKey);
-
- LOG.debug("Final New Key => " + Bytes.toString(nextKey));
-
- return next(key, value);
- }
- } else {
- // Nothig left. return false
- return false;
- }
- } else {
- if (resultVector != null && resultVector.size() != 0) {
- LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) );
-
- List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
- Result result = new Result(resultKeyValue);
-
- LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) );
-
- if( useSalt) {
- key.set( HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
-
- return true;
- } else {
- if (nextKey != null) {
- LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey)));
-
- Get theGet = new Get(nextKey);
- theGet.setMaxVersions(versions);
-
- Result resultAll = this.htable.get(theGet);
-
- if( resultAll != null && (! resultAll.isEmpty())) {
- List<KeyValue> keyValeList = resultAll.list();
-
- keyValueMap = new HashMap<Long, List<KeyValue>>();
-
- LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap));
-
- for (KeyValue keyValue : keyValeList) {
- long version = keyValue.getTimestamp();
-
- if (keyValueMap.containsKey(new Long(version))) {
- List<KeyValue> keyValueTempList = keyValueMap.get(new Long(
- version));
- if (keyValueTempList == null) {
- keyValueTempList = new ArrayList<KeyValue>();
- }
- keyValueTempList.add(keyValue);
- } else {
- List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
- keyValueMap.put(new Long(version), keyValueTempList);
- keyValueTempList.add(keyValue);
- }
- }
-
- resultVector = new Vector<List<KeyValue>>();
- resultVector.addAll(keyValueMap.values());
-
- List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
-
- Result result = new Result(resultKeyValue);
-
- LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) );
-
- String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
-
- System.out.println("+ New Key => " + newKey);
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
- .toBytes(newKey);
-
- if( useSalt) {
- key.set( HBaseSalter.delSaltPrefix(result.getRow()));
- } else {
- key.set(result.getRow());
- }
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
- } else {
- LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
-
- String newKey;
-
- while( (newKey = keyList.pollFirst()) != null ) {
- LOG.debug("+ WHILE NEXT Key => " + newKey);
-
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
- .toBytes(newKey);
-
- if( nextKey == null ) {
- LOG.error("+ BOMB! BOMB! BOMB!");
- continue;
- }
-
- if( ! this.htable.exists( new Get(nextKey) ) ) {
- LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
- continue;
- } else { break; }
- }
-
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
- .toBytes(newKey);
-
- LOG.debug("+ Final New Key => " + Bytes.toString(nextKey));
-
- return next(key, value);
- }
-
- } else {
- return false;
- }
- }
- }
- }
- default:
- throw new IOException("Unknown source mode : " + sourceMode);
- }
- }
+ RecordReader<ImmutableBytesWritable, Result> {
+
+ static final Log LOG = LogFactory.getLog(HBaseRecordReader.class);
+
+ private byte[] startRow;
+ private byte[] endRow;
+ private byte[] lastSuccessfulRow;
+ private TreeSet<String> keyList;
+ private SourceMode sourceMode;
+ private Filter trrRowFilter;
+ private ResultScanner scanner;
+ private HTable htable;
+ private byte[][] trrInputColumns;
+ private long timestamp;
+ private int rowcount;
+ private boolean logScannerActivity = false;
+ private int logPerRowCount = 100;
+ private boolean endRowInclusive = true;
+ private int versions = 1;
+ private boolean useSalt = false;
+
+ private HBaseMultiInputSplit multiSplit = null;
+ private List<HBaseTableSplit> allSplits = null;
+
+ private HBaseRecordReader() {
+ }
+
+ public HBaseRecordReader(HBaseMultiInputSplit mSplit) throws IOException {
+ multiSplit = mSplit;
+
+ LOG.info("Creatin Multi Split for region location : "
+ + multiSplit.getRegionLocation());
+
+ allSplits = multiSplit.getSplits();
+ }
+
+ public boolean setNextSplit() throws IOException {
+ if (allSplits.size() > 0) {
+ setSplitValue(allSplits.remove(0));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void setSplitValue(HBaseTableSplit tSplit) throws IOException {
+ switch (tSplit.getSourceMode()) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+ LOG.debug(String.format(
+ "For split [%s] we have start key (%s) and stop key (%s)",
+ tSplit, tSplit.getStartRow(), tSplit.getEndRow()));
+
+ setStartRow(tSplit.getStartRow());
+ setEndRow(tSplit.getEndRow());
+ setEndRowInclusive(tSplit.getEndRowInclusive());
+ }
+
+ break;
+
+ case GET_LIST: {
+ LOG.debug(String.format("For split [%s] we have key list (%s)",
+ tSplit, tSplit.getKeyList()));
+
+ setKeyList(tSplit.getKeyList());
+ setVersions(tSplit.getVersions());
+ }
+
+ break;
+
+ case EMPTY:
+ LOG.info("EMPTY split. Doing nothing.");
+ break;
+
+ default:
+ throw new IOException("Unknown source mode : "
+ + tSplit.getSourceMode());
+ }
+
+ setSourceMode(tSplit.getSourceMode());
+
+ init();
+ }
+
+ /**
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
+ * @throws IOException
+ */
+ private void restartRangeScan(byte[] firstRow) throws IOException {
+ Scan currentScan;
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
+
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(false);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
+ + ", endRow: " + Bytes.toString(endRow));
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
+ + ", no endRow");
+
+ Scan scan = new Scan(firstRow);
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ if (logScannerActivity) {
+ LOG.debug("Current scan=" + currentScan.toString());
+ timestamp = System.currentTimeMillis();
+ rowcount = 0;
+ }
+ }
+
+ public TreeSet<String> getKeyList() {
+ return keyList;
+ }
+
+ private void setKeyList(TreeSet<String> keyList) {
+ this.keyList = keyList;
+ }
+
+ private void setVersions(int versions) {
+ this.versions = versions;
+ }
+
+ public void setUseSalt(boolean useSalt) {
+ this.useSalt = useSalt;
+ }
+
+ public SourceMode getSourceMode() {
+ return sourceMode;
+ }
+
+ private void setSourceMode(SourceMode sourceMode) {
+ this.sourceMode = sourceMode;
+ }
+
+ public byte[] getEndRow() {
+ return endRow;
+ }
+
+ private void setEndRowInclusive(boolean isInclusive) {
+ endRowInclusive = isInclusive;
+ }
+
+ public boolean getEndRowInclusive() {
+ return endRowInclusive;
+ }
+
+ private byte[] nextKey = null;
+ private Vector<List<KeyValue>> resultVector = null;
+ Map<Long, List<KeyValue>> keyValueMap = null;
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ private void init() throws IOException {
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ restartRangeScan(startRow);
+ break;
+
+ case GET_LIST:
+ nextKey = Bytes.toBytes(keyList.pollFirst());
+ break;
+
+ case EMPTY:
+ LOG.info("EMPTY mode. Do nothing");
+ break;
+
+ default:
+ throw new IOException(" Unknown source mode : " + sourceMode);
+ }
+ }
+
+ byte[] getStartRow() {
+ return this.startRow;
+ }
+
+ /**
+ * @param htable
+ * the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(
+ ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns
+ * the columns to be placed in {@link Result}.
+ */
+ public void setInputColumns(final byte[][] inputColumns) {
+ this.trrInputColumns = inputColumns;
+ }
+
+ /**
+ * @param startRow
+ * the first row in the split
+ */
+ private void setStartRow(final byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ /**
+ *
+ * @param endRow
+ * the last row in the split
+ */
+ private void setEndRow(final byte[] endRow) {
+ this.endRow = endRow;
+ }
+
+ /**
+ * @param rowFilter
+ * the {@link Filter} to be used.
+ */
+ public void setRowFilter(Filter rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ @Override
+ public void close() {
+ if (this.scanner != null)
+ this.scanner.close();
+ }
+
+ /**
+ * @return ImmutableBytesWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ /**
+ * @return RowResult
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ @Override
+ public Result createValue() {
+ return new Result();
+ }
+
+ @Override
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ @Override
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key
+ * HStoreKey as input key.
+ * @param value
+ * MapWritable as input value
+ * @return true if there was more data
+ * @throws IOException
+ */
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value)
+ throws IOException {
+
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+
+ Result result;
+ try {
+ try {
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp)
+ + "ms to process " + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
+ }
+ }
+ } catch (IOException e) {
+ // try to handle all IOExceptions by restarting
+ // the scanner, if the second call fails, it will be rethrown
+ LOG.debug("recovered from "
+ + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation,"
+ + " if your mapper has restarted a few other times like this"
+ + " then you should consider killing this job and investigate"
+ + " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restartRangeScan(startRow);
+ } else {
+ restartRangeScan(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
+ }
+
+ if (result != null && result.size() > 0) {
+ if (useSalt) {
+ key.set(HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return setNextSplit();
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp)
+ + "ms to process " + rowcount + " rows");
+ LOG.debug(ioe);
+ String lastRow = lastSuccessfulRow == null ? "null" : Bytes
+ .toStringBinary(lastSuccessfulRow);
+ LOG.debug("lastSuccessfulRow=" + lastRow);
+ }
+ throw ioe;
+ }
+ }
+
+ case GET_LIST: {
+ LOG.debug(String.format("INTO next with GET LIST and Key (%s)",
+ Bytes.toString(nextKey)));
+
+ if (versions == 1) {
+ if (nextKey != null) {
+ LOG.debug(String.format("Processing Key (%s)",
+ Bytes.toString(nextKey)));
+
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result result = this.htable.get(theGet);
+
+ if (result != null && (!result.isEmpty())) {
+ LOG.debug(String.format(
+ "Key (%s), Version (%s), Got Result (%s)",
+ Bytes.toString(nextKey), versions, result));
+
+ if (keyList != null || !keyList.isEmpty()) {
+ String newKey = keyList.pollFirst();
+ LOG.debug("New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null
+ : Bytes.toBytes(newKey);
+ } else {
+ nextKey = null;
+ }
+
+ LOG.debug(String.format("=> Picked a new Key (%s)",
+ Bytes.toString(nextKey)));
+
+ // Write the result
+ if (useSalt) {
+ key.set(HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+
+ return true;
+ } else {
+ LOG.debug(" Key (" + Bytes.toString(nextKey)
+ + ") return an EMPTY result. Get (" + theGet.getId()
+ + ")"); // alg0
+
+ String newKey;
+ while ((newKey = keyList.pollFirst()) != null) {
+ LOG.debug("WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null
+ : Bytes.toBytes(newKey);
+
+ if (nextKey == null) {
+ LOG.error("BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if (!this.htable.exists(new Get(nextKey))) {
+ LOG.debug(String.format(
+ "Key (%s) Does not exist in Table (%s)",
+ Bytes.toString(nextKey),
+ Bytes.toString(this.htable.getTableName())));
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null
+ : Bytes.toBytes(newKey);
+
+ LOG.debug("Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
+ }
+ } else {
+ // Nothig left. return false
+ return setNextSplit();
+ }
+ } else {
+ if (resultVector != null && resultVector.size() != 0) {
+ LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>",
+ versions, resultVector));
+
+ List<KeyValue> resultKeyValue = resultVector
+ .remove(resultVector.size() - 1);
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result <%s>",
+ versions, result));
+
+ if (useSalt) {
+ key.set(HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+
+ return true;
+ } else {
+ if (nextKey != null) {
+ LOG.debug(String.format("+ Processing Key (%s)",
+ Bytes.toString(nextKey)));
+
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result resultAll = this.htable.get(theGet);
+
+ if (resultAll != null && (!resultAll.isEmpty())) {
+ List<KeyValue> keyValeList = resultAll.list();
+
+ keyValueMap = new HashMap<Long, List<KeyValue>>();
+
+ LOG.debug(String.format(
+ "+ Key (%s) Versions (%s) Val;ute map <%s>",
+ Bytes.toString(nextKey), versions, keyValueMap));
+
+ for (KeyValue keyValue : keyValeList) {
+ long version = keyValue.getTimestamp();
+
+ if (keyValueMap.containsKey(new Long(version))) {
+ List<KeyValue> keyValueTempList = keyValueMap
+ .get(new Long(version));
+ if (keyValueTempList == null) {
+ keyValueTempList = new ArrayList<KeyValue>();
+ }
+ keyValueTempList.add(keyValue);
+ } else {
+ List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
+ keyValueMap.put(new Long(version),
+ keyValueTempList);
+ keyValueTempList.add(keyValue);
+ }
+ }
+
+ resultVector = new Vector<List<KeyValue>>();
+ resultVector.addAll(keyValueMap.values());
+
+ List<KeyValue> resultKeyValue = resultVector
+ .remove(resultVector.size() - 1);
+
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format(
+ "+ Version (%s), Got Result (%s)", versions,
+ result));
+
+ String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
+
+ System.out.println("+ New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null
+ : Bytes.toBytes(newKey);
+
+ if (useSalt) {
+ key.set(HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ } else {
+ LOG.debug(String.format(
+ "+ Key (%s) return an EMPTY result. Get (%s)",
+ Bytes.toString(nextKey), theGet.getId())); // alg0
+
+ String newKey;
+
+ while ((newKey = keyList.pollFirst()) != null) {
+ LOG.debug("+ WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null
+ : Bytes.toBytes(newKey);
+
+ if (nextKey == null) {
+ LOG.error("+ BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if (!this.htable.exists(new Get(nextKey))) {
+ LOG.debug(String.format(
+ "+ Key (%s) Does not exist in Table (%s)",
+ Bytes.toString(nextKey),
+ Bytes.toString(this.htable.getTableName())));
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null
+ : Bytes.toBytes(newKey);
+
+ LOG.debug("+ Final New Key => "
+ + Bytes.toString(nextKey));
+
+ return next(key, value);
+ }
+
+ } else {
+ return setNextSplit();
+ }
+ }
+ }
+ }
+
+ case EMPTY: {
+ LOG.info("GOT an empty Split");
+ return setNextSplit();
+ }
+
+ default:
+ throw new IOException("Unknown source mode : " + sourceMode);
+ }
+ }
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index 3c64e52..aa446c1 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -219,10 +219,7 @@ public class HBaseScheme
String fieldName = (String) fields.get(k);
byte[] fieldNameBytes = Bytes.toBytes(fieldName);
byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes);
- if (cellValue == null) {
- cellValue = new byte[0];
- }
- result.add(new ImmutableBytesWritable(cellValue));
+ result.add(cellValue != null ? new ImmutableBytesWritable(cellValue) : null);
}
}
@@ -259,7 +256,8 @@ public class HBaseScheme
Tuple tuple = values.getTuple();
ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
- put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
+ if (valueBytes != null)
+ put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
index a5c3bdd..87b8f58 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
@@ -12,196 +12,208 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.InputSplit;
-import com.sun.tools.javac.resources.version;
-
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable {
-
- private final Log LOG = LogFactory.getLog(HBaseTableSplit.class);
-
- private byte [] m_tableName = null;
- private byte [] m_startRow = null;
- private byte [] m_endRow = null;
- private String m_regionLocation = null;
- private TreeSet<String> m_keyList = null;
- private SourceMode m_sourceMode = SourceMode.EMPTY;
- private boolean m_endRowInclusive = true;
- private int m_versions = 1;
- private boolean m_useSalt = false;
-
- /** default constructor */
- public HBaseTableSplit() {
- this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);
- }
-
- /**
- * Constructor
- * @param tableName
- * @param startRow
- * @param endRow
- * @param location
- */
- public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow,
- final String location, final SourceMode sourceMode, final boolean useSalt) {
- this.m_tableName = tableName;
- this.m_startRow = startRow;
- this.m_endRow = endRow;
- this.m_regionLocation = location;
- this.m_sourceMode = sourceMode;
- this.m_useSalt = useSalt;
- }
-
- public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, int versions, final String location, final SourceMode sourceMode, final boolean useSalt ) {
- this.m_tableName = tableName;
- this.m_keyList = keyList;
- this.m_versions = versions;
- this.m_sourceMode = sourceMode;
- this.m_regionLocation = location;
- this.m_useSalt = useSalt;
- }
-
- /** @return table name */
- public byte [] getTableName() {
- return this.m_tableName;
- }
-
- /** @return starting row key */
- public byte [] getStartRow() {
- return this.m_startRow;
- }
-
- /** @return end row key */
- public byte [] getEndRow() {
- return this.m_endRow;
- }
-
- public boolean getEndRowInclusive() {
- return m_endRowInclusive;
- }
-
- public void setEndRowInclusive(boolean isInclusive) {
- m_endRowInclusive = isInclusive;
- }
-
- /** @return list of keys to get */
- public TreeSet<String> getKeyList() {
- return m_keyList;
- }
-
- public int getVersions() {
- return m_versions;
- }
-
- /** @return get the source mode */
- public SourceMode getSourceMode() {
- return m_sourceMode;
- }
-
- public boolean getUseSalt() {
- return m_useSalt;
- }
-
- /** @return the region's hostname */
- public String getRegionLocation() {
- LOG.debug("REGION GETTER : " + m_regionLocation);
-
- return this.m_regionLocation;
- }
-
- public String[] getLocations() {
- LOG.debug("REGION ARRAY : " + m_regionLocation);
-
- return new String[] {this.m_regionLocation};
- }
-
- @Override
- public long getLength() {
- // Not clear how to obtain this... seems to be used only for sorting splits
- return 0;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- LOG.debug("READ ME : " + in.toString());
-
- this.m_tableName = Bytes.readByteArray(in);
- this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
- this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in)));
- this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
-
- switch(this.m_sourceMode) {
- case SCAN_RANGE:
- this.m_startRow = Bytes.readByteArray(in);
- this.m_endRow = Bytes.readByteArray(in);
- this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
- break;
-
- case GET_LIST:
- this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
- this.m_keyList = new TreeSet<String>();
-
- int m = Bytes.toInt(Bytes.readByteArray(in));
-
- for( int i = 0; i < m; i++) {
- this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
- }
- break;
- }
-
- LOG.debug("READ and CREATED : " + this);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- LOG.debug("WRITE : " + this);
-
- Bytes.writeByteArray(out, this.m_tableName);
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
-
- switch( this.m_sourceMode ) {
- case SCAN_RANGE:
- Bytes.writeByteArray(out, this.m_startRow);
- Bytes.writeByteArray(out, this.m_endRow);
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
- break;
-
- case GET_LIST:
- Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
- Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
-
- for( String k: this.m_keyList ) {
- Bytes.writeByteArray(out, Bytes.toBytes(k));
- }
- break;
- }
-
- LOG.debug("WROTE : " + out.toString());
- }
-
- @Override
- public String toString() {
- return String.format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
- Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow),
- (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions, m_useSalt);
- }
-
- @Override
- public int compareTo(HBaseTableSplit o) {
- switch(m_sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- return Bytes.compareTo(getStartRow(), o.getStartRow());
-
- case GET_LIST:
- return m_keyList.equals( o.getKeyList() ) ? 0 : -1;
-
- default:
- return -1;
- }
-
- }
+public class HBaseTableSplit implements InputSplit,
+ Comparable<HBaseTableSplit>, Serializable {
+
+ private final Log LOG = LogFactory.getLog(HBaseTableSplit.class);
+
+ private byte[] m_tableName = null;
+ private byte[] m_startRow = null;
+ private byte[] m_endRow = null;
+ private String m_regionLocation = null;
+ private TreeSet<String> m_keyList = null;
+ private SourceMode m_sourceMode = SourceMode.EMPTY;
+ private boolean m_endRowInclusive = true;
+ private int m_versions = 1;
+ private boolean m_useSalt = false;
+
+ /** default constructor */
+ public HBaseTableSplit() {
+ this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY, false);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ * @param location
+ */
+ public HBaseTableSplit(final byte[] tableName, final byte[] startRow,
+ final byte[] endRow, final String location,
+ final SourceMode sourceMode, final boolean useSalt) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_regionLocation = location;
+ this.m_sourceMode = sourceMode;
+ this.m_useSalt = useSalt;
+ }
+
+ public HBaseTableSplit(final byte[] tableName,
+ final TreeSet<String> keyList, int versions, final String location,
+ final SourceMode sourceMode, final boolean useSalt) {
+ this.m_tableName = tableName;
+ this.m_keyList = keyList;
+ this.m_versions = versions;
+ this.m_sourceMode = sourceMode;
+ this.m_regionLocation = location;
+ this.m_useSalt = useSalt;
+ }
+
+ /** @return table name */
+ public byte[] getTableName() {
+ return this.m_tableName;
+ }
+
+ /** @return starting row key */
+ public byte[] getStartRow() {
+ return this.m_startRow;
+ }
+
+ /** @return end row key */
+ public byte[] getEndRow() {
+ return this.m_endRow;
+ }
+
+ public boolean getEndRowInclusive() {
+ return m_endRowInclusive;
+ }
+
+ public void setEndRowInclusive(boolean isInclusive) {
+ m_endRowInclusive = isInclusive;
+ }
+
+ /** @return list of keys to get */
+ public TreeSet<String> getKeyList() {
+ return m_keyList;
+ }
+
+ public int getVersions() {
+ return m_versions;
+ }
+
+ /** @return get the source mode */
+ public SourceMode getSourceMode() {
+ return m_sourceMode;
+ }
+
+ public boolean getUseSalt() {
+ return m_useSalt;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ LOG.debug("REGION GETTER : " + m_regionLocation);
+
+ return this.m_regionLocation;
+ }
+
+ public String[] getLocations() {
+ LOG.debug("REGION ARRAY : " + m_regionLocation);
+
+ return new String[] { this.m_regionLocation };
+ }
+
+ @Override
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting
+ // splits
+ return 0;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ LOG.debug("READ ME : " + in.toString());
+
+ this.m_tableName = Bytes.readByteArray(in);
+ this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes
+ .readByteArray(in)));
+ this.m_useSalt = Bytes.toBoolean(Bytes.readByteArray(in));
+
+ switch (this.m_sourceMode) {
+ case SCAN_RANGE:
+ this.m_startRow = Bytes.readByteArray(in);
+ this.m_endRow = Bytes.readByteArray(in);
+ this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
+ break;
+
+ case GET_LIST:
+ this.m_versions = Bytes.toInt(Bytes.readByteArray(in));
+ this.m_keyList = new TreeSet<String>();
+
+ int m = Bytes.toInt(Bytes.readByteArray(in));
+
+ for (int i = 0; i < m; i++) {
+ this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
+ }
+ break;
+ }
+
+ LOG.debug("READ and CREATED : " + this);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ LOG.debug("WRITE : " + this);
+
+ Bytes.writeByteArray(out, this.m_tableName);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_useSalt));
+
+ switch (this.m_sourceMode) {
+ case SCAN_RANGE:
+ Bytes.writeByteArray(out, this.m_startRow);
+ Bytes.writeByteArray(out, this.m_endRow);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
+ break;
+
+ case GET_LIST:
+ Bytes.writeByteArray(out, Bytes.toBytes(m_versions));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
+
+ for (String k : this.m_keyList) {
+ Bytes.writeByteArray(out, Bytes.toBytes(k));
+ }
+ break;
+ }
+
+ LOG.debug("WROTE : " + out.toString());
+ }
+
+ @Override
+ public String toString() {
+ return String
+ .format(
+ "Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List Size (%s) Versions (%s) Use Salt (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_sourceMode,
+ Bytes.toString(m_startRow), Bytes.toString(m_endRow),
+ (m_keyList != null) ? m_keyList.size() : "EMPTY", m_versions,
+ m_useSalt);
+ }
+
+ @Override
+ public int compareTo(HBaseTableSplit o) {
+ switch (m_sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ return Bytes.compareTo(getStartRow(), o.getStartRow());
+
+ case GET_LIST:
+ return m_keyList.equals(o.getKeyList()) ? 0 : -1;
+
+ case EMPTY:
+ return 0;
+
+ default:
+ return -1;
+ }
+
+ }
} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
index b6d5742..31ed3ea 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -10,15 +10,14 @@ import org.apache.hadoop.hbase.util.Bytes
import cascading.tuple.TupleEntry
class HBasePipeWrapper (pipe: Pipe) {
- def toBytesWritable(f: Fields): Pipe = {
+ def toBytesWritable(f: Fields): Pipe = {
asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String => {
- new ImmutableBytesWritable(Bytes.toBytes(
- if (from == null) "" else from))
- }}
- }}
- }
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
+ }
// def toBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
@@ -30,13 +29,12 @@ class HBasePipeWrapper (pipe: Pipe) {
def fromBytesWritable(f: Fields): Pipe = {
asList(f)
- .foldLeft(pipe) { (p, fld) =>
- p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
- Bytes.toString(from.get)
- }
- }
- }
- }
+ .foldLeft(pipe) { (p, fld) => {
+ p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable =>
+ Option(from).map(x => Bytes.toString(x.get)).getOrElse(null)
+ }
+ }}
+ }
// def fromBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
index eccd653..2aa5342 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
@@ -7,93 +7,101 @@ import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.log4j.Logger
-
import com.twitter.scalding._
import com.twitter.scalding.Args
-
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import org.apache.hadoop.hbase.client.Put
+import parallelai.spyglass.hbase.HBaseSalter
class HBaseExample(args: Args) extends JobBase(args) {
- val isDebug: Boolean = args("debug").toBoolean
+ val isDebug: Boolean = args("debug").toBoolean
- if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
+ if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
- val output = args("output")
+ val output = args("output")
- println(output)
+ val jobConf = getJobConf()
- val jobConf = getJobConf()
+ val quorumNames = args("quorum")
- val quorumNames = args("quorum")
+ println("Output : " + output)
+ println("Quorum : " + quorumNames)
- case class HBaseTableStore(
+ case class HBaseTableStore(
conf: Configuration,
quorum: String,
tableName: String) {
- val tableBytes = Bytes.toBytes(tableName)
- val connection = HConnectionManager.getConnection(conf)
- val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
-
- conf.set("hbase.zookeeper.quorum", quorumNames)
-
- val htable = new HTable(HBaseConfiguration.create(conf), tableName)
-
- }
-
- val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet")
-
- val hbs2 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
- .read
- .write(Tsv(output.format("get_list")))
-
- val hbs3 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
- .read
- .write(Tsv(output.format("scan_all")))
-
- val hbs4 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_to_end")))
-
- val hbs5 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_from_start")))
-
- val hbs6 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
- .read
- .write(Tsv(output.format("scan_range_between")))
-
-} \ No newline at end of file
+ val tableBytes = Bytes.toBytes(tableName)
+ val connection = HConnectionManager.getConnection(conf)
+ val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
+
+ conf.set("hbase.zookeeper.quorum", quorumNames)
+
+ val htable = new HTable(HBaseConfiguration.create(conf), tableName)
+
+ def makeN(n: Int) {
+ (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => {
+ val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x)))
+ put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x))
+ })
+ }
+
+ }
+
+ HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000)
+
+ val hbs2 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.GET_LIST, keyList = List("13914", "10687", "14897").map(x => "%015d".format(x.toLong)), useSalt = true)
+ .read
+ .write(Tsv(output.format("get_list")))
+
+ val hbs3 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
+ .read
+ .write(Tsv(output.format("scan_all")))
+
+ val hbs4 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_RANGE, stopKey = "%015d".format("13914".toLong), useSalt = true)
+ .read
+ .write(Tsv(output.format("scan_range_to_end")))
+
+ val hbs5 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), useSalt = true)
+ .read
+ .write(Tsv(output.format("scan_range_from_start")))
+
+ val hbs6 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), stopKey = "%015d".format("16897".toLong), useSalt = true)
+ .read
+ .write(Tsv(output.format("scan_range_between")))
+
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
index 890d2be..920f17d 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
@@ -3,36 +3,39 @@ package parallelai.spyglass.hbase.example
import com.twitter.scalding.Tool
import org.joda.time.format.DateTimeFormat
import java.util.Formatter.DateTime
+import parallelai.spyglass.base.JobRunner
object HBaseExampleRunner extends App {
- val appPath = System.getenv("BIGDATA_APPCONF_PATH")
- assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"})
- println( "Application Path is [%s]".format(appPath) )
-
- val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match {
- case "hdfs" => "--hdfs"
- case _ => "--local"
- }}
-
- println(modeString)
-
- val jobLibPath = modeString match {
- case "--hdfs" => {
- val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")
- assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"})
- println( "Job Library Path Path is [%s]".format(jobLibPath) )
- jobLibPath
- }
- case _ => ""
- }
-
- val quorum = System.getenv("BIGDATA_QUORUM_NAMES")
- assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"})
- println( "Quorum is [%s]".format(quorum) )
-
- val output = "HBaseTest.%s.tsv"
-
- Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath,
- "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum ))
-
+ val appPath = System.getenv("BIGDATA_APPCONF_PATH")
+ assert(appPath != null, { "Environment Variable BIGDATA_APPCONF_PATH is undefined or Null" })
+ println("Application Path is [%s]".format(appPath))
+
+ val modeString = if (args.length == 0) { "--hdfs" } else {
+ args(0) match {
+ case "hdfs" => "--hdfs"
+ case _ => "--hdfs"
+ }
+ }
+
+ println(modeString)
+
+ val jobLibPath = modeString match {
+ case "--hdfs" => {
+ val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")
+ assert(jobLibPath != null, { "Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null" })
+ println("Job Library Path Path is [%s]".format(jobLibPath))
+ jobLibPath
+ }
+ case _ => ""
+ }
+
+ val quorum = System.getenv("BIGDATA_QUORUM_NAMES")
+ assert(quorum != null, { "Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null" })
+ println("Quorum is [%s]".format(quorum))
+
+ val output = "HBaseTest.%s"
+
+ JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath,
+ "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum))
+
} \ No newline at end of file