aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass
diff options
context:
space:
mode:
authorcra14 <chandan.rajah2@bskyb.com>2013-04-26 12:47:12 +0100
committercra14 <chandan.rajah2@bskyb.com>2013-04-26 12:47:12 +0100
commitcbf6c2903bfd0a5fe528c54382ea791c45637ded (patch)
tree2ca67f31c4d0c1779c163cb48234e821616ec6e1 /src/main/java/parallelai/spyglass
parentd6d712287b2bcd74f0c5bbc3ecbb106741443d7c (diff)
downloadSpyGlass-cbf6c2903bfd0a5fe528c54382ea791c45637ded.tar.gz
SpyGlass-cbf6c2903bfd0a5fe528c54382ea791c45637ded.zip
First public release of Spy Glass code base
Diffstat (limited to 'src/main/java/parallelai/spyglass')
-rw-r--r--src/main/java/parallelai/spyglass/base/JobLibLoader.java70
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConstants.java19
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java531
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java325
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java336
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java190
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java360
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java117
8 files changed, 1948 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java
new file mode 100644
index 0000000..af5bdf4
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java
@@ -0,0 +1,70 @@
+package parallelai.spyglass.base;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.LogManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+public class JobLibLoader {
+
+ private static Logger logger = LogManager.getLogger(JobLibLoader.class);
+
+ public static void loadJars(String libPathStr, Configuration config) {
+
+
+ try {
+ Path libPath = new Path(libPathStr);
+
+ FileSystem fs = FileSystem.get(config);
+
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(libPath, true);
+
+ while (itr.hasNext()) {
+ LocatedFileStatus f = itr.next();
+
+ if (!f.isDirectory() && f.getPath().getName().endsWith("jar")) {
+ logger.info("Loading Jar : " + f.getPath().getName());
+ DistributedCache.addFileToClassPath(f.getPath(), config);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e.toString());
+ }
+ }
+
+ public static void addFiletoCache(String libPathStr, Configuration config) {
+
+ try {
+ Path filePath = new Path(libPathStr);
+ DistributedCache.addCacheFile(filePath.toUri(), config);
+ // DistributedCache.createSymlink(config);
+
+ // config.set("mapred.cache.files", libPathStr);
+ // config.set("mapred.create.symlink", "yes");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static Path[] getFileFromCache(String libPathStr,
+ Configuration config) {
+ Path[] localFiles = null;
+ try {
+ logger.info("Local Cache => " + DistributedCache.getLocalCacheFiles(config));
+ logger.info("Hadoop Cache => "+ DistributedCache.getCacheFiles(config));
+ if (DistributedCache.getLocalCacheFiles(config) != null) {
+ localFiles = DistributedCache.getLocalCacheFiles(config);
+ }
+ logger.info("LocalFiles => " + localFiles);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return localFiles;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
new file mode 100644
index 0000000..b546107
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
@@ -0,0 +1,19 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class HBaseConstants {
+
+ public enum SourceMode {
+ EMPTY,
+ SCAN_ALL,
+ SCAN_RANGE,
+ GET_LIST;
+ }
+
+ public static final String START_KEY = "hbase.%s.startkey";
+ public static final String STOP_KEY = "hbase.%s.stopkey";
+ public static final String SOURCE_MODE = "hbase.%s.source.mode";
+ public static final String KEY_LIST = "hbase.%s.key.list";
+
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
new file mode 100644
index 0000000..f1f4fb7
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java
@@ -0,0 +1,531 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import javax.naming.NamingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+
+public class HBaseInputFormat
+ implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
+
+ private final Log LOG = LogFactory.getLog(HBaseInputFormat.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ private byte [][] inputColumns;
+ private HTable table;
+ private HBaseRecordReader tableRecordReader;
+ private Filter rowFilter;
+ private String tableName = "";
+
+ private HashMap<InetAddress, String> reverseDNSCacheMap =
+ new HashMap<InetAddress, String>();
+
+ private String nameServer = null;
+
+// private Scan scan = null;
+
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ if (this.table == null) {
+ throw new IOException("No table was provided");
+ }
+
+ if (this.inputColumns == null || this.inputColumns.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+
+ Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+
+ if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+ HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1);
+ HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+ .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY);
+ splits.add(split);
+
+ return splits.toArray(new HBaseTableSplit[splits.size()]);
+ }
+
+ if( keys.getSecond() == null || keys.getSecond().length == 0) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ if( keys.getFirst().length != keys.getSecond().length ) {
+ throw new IOException("Regions for start and end key do not match");
+ }
+
+ byte[] minKey = keys.getFirst()[keys.getFirst().length - 1];
+ byte[] maxKey = keys.getSecond()[0];
+
+ LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+ byte [][] regStartKeys = keys.getFirst();
+ byte [][] regStopKeys = keys.getSecond();
+ String [] regions = new String[regStartKeys.length];
+
+ for( int i = 0; i < regStartKeys.length; i++ ) {
+ minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey;
+ maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey;
+
+ HServerAddress regionServerAddress =
+ table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+ InetAddress regionAddress =
+ regionServerAddress.getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress +
+ " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+// HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+// InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
+//
+// String regionLocation;
+//
+// try {
+// regionLocation = reverseDNS(regionAddress);
+// } catch (NamingException e) {
+// LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e);
+// regionLocation = regionServerAddress.getHostname();
+// }
+
+// String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname();
+
+ LOG.debug( "***** " + regionLocation );
+
+ if( regionLocation == null || regionLocation.length() == 0 )
+ throw new IOException( "The region info for regiosn " + i + " is null or empty");
+
+ regions[i] = regionLocation;
+
+ LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) ));
+ }
+
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+ LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey)));
+
+ LOG.info("SOURCE MODE is : " + sourceMode);
+
+ switch( sourceMode ) {
+ case SCAN_ALL:
+ startRow = HConstants.EMPTY_START_ROW;
+ stopRow = HConstants.EMPTY_END_ROW;
+
+ LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
+ break;
+
+ case SCAN_RANGE:
+ startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ;
+ stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ;
+
+ LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow)));
+ break;
+ }
+
+ switch( sourceMode ) {
+ case EMPTY:
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ {
+// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow;
+// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow;
+
+ List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow);
+
+ int maxRegions = validRegions.size();
+ int currentRegion = 1;
+
+ for( HRegionLocation cRegion : validRegions ) {
+ byte [] rStart = cRegion.getRegionInfo().getStartKey();
+ byte [] rStop = cRegion.getRegionInfo().getEndKey();
+
+ HServerAddress regionServerAddress = cRegion.getServerAddress();
+ InetAddress regionAddress =
+ regionServerAddress.getInetSocketAddress().getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.error("Cannot resolve the host name for " + regionAddress +
+ " because of " + e);
+ regionLocation = regionServerAddress.getHostname();
+ }
+
+ byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow);
+ byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);
+
+ LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",
+ (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )),
+ (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )),
+ rStop.length
+ ));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(),
+ sStart,
+ sStop,
+ regionLocation,
+ SourceMode.SCAN_RANGE
+ );
+
+ split.setEndRowInclusive( currentRegion == maxRegions );
+
+ currentRegion ++;
+
+ LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",
+ Bytes.toString(startRow), Bytes.toString(stopRow),
+ Bytes.toString(rStart), Bytes.toString(rStop),
+ Bytes.toString(sStart),
+ Bytes.toString(sStop),
+ cRegion.getHostnamePort(), split) );
+
+ splits.add(split);
+ }
+
+//
+// for (int i = 0; i < keys.getFirst().length; i++) {
+//
+// if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+// LOG.info("NOT including regions : " + regions[i]);
+// continue;
+// }
+//
+// // determine if the given start an stop key fall into the region
+// if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+// Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+// (stopRow.length == 0 ||
+// Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+//
+// byte[] splitStart = startRow.length == 0 ||
+// Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+// keys.getFirst()[i] : startRow;
+// byte[] splitStop = (stopRow.length == 0 ||
+// Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+// keys.getSecond()[i].length > 0 ?
+// keys.getSecond()[i] : stopRow;
+// HBaseTableSplit split = new HBaseTableSplit(table.getTableName(),
+// splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE);
+// splits.add(split);
+//
+// LOG.info("getSplits: split -> " + i + " -> " + split);
+// }
+// }
+
+ LOG.info("RETURNED SPLITS: split -> " + splits);
+
+ return splits.toArray(new HBaseTableSplit[splits.size()]);
+ }
+
+ case GET_LIST:
+ {
+ if( keyList == null || keyList.size() == 0 ) {
+ throw new IOException("Source Mode is GET_LIST but key list is EMPTY");
+ }
+
+ List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>();
+
+ for (int i = 0; i < keys.getFirst().length; i++) {
+
+ if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+ continue;
+ }
+
+ LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] )));
+
+ Set<String> regionsSubSet = null;
+
+ if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) {
+ LOG.info("REGION start is empty");
+ LOG.info("REGION stop is empty");
+ regionsSubSet = keyList;
+ } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) {
+ LOG.info("REGION start is empty");
+ regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true);
+ } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) {
+ LOG.info("REGION stop is empty");
+ regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true);
+ } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) {
+ regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true);
+ } else {
+ throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",
+ regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i])));
+ }
+
+ if( regionsSubSet == null || regionsSubSet.size() == 0) {
+ LOG.info( "EMPTY: Key is for region " + regions[i] + " is null");
+
+ continue;
+ }
+
+ TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet);
+
+ LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList ));
+
+ HBaseTableSplit split = new HBaseTableSplit(
+ table.getTableName(), regionKeyList,
+ regions[i],
+ SourceMode.GET_LIST);
+ splits.add(split);
+ }
+
+ return splits.toArray(new HBaseTableSplit[splits.size()]);
+ }
+
+ default:
+ throw new IOException("Unknown source Mode : " + sourceMode );
+ }
+ }
+
+ private String reverseDNS(InetAddress ipAddress) throws NamingException {
+ String hostName = this.reverseDNSCacheMap.get(ipAddress);
+ if (hostName == null) {
+ hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
+ this.reverseDNSCacheMap.put(ipAddress, hostName);
+ }
+ return hostName;
+ }
+
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ if( ! (split instanceof HBaseTableSplit ) )
+ throw new IOException("Table Split is not type HBaseTableSplit");
+
+ HBaseTableSplit tSplit = (HBaseTableSplit) split;
+
+ HBaseRecordReader trr = new HBaseRecordReader();
+
+ switch( tSplit.getSourceMode() ) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ {
+ LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() ));
+
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setEndRowInclusive(tSplit.getEndRowInclusive());
+ }
+
+ break;
+
+ case GET_LIST:
+ {
+ LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() ));
+
+ trr.setKeyList(tSplit.getKeyList());
+ }
+
+ break;
+
+ default:
+ throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() );
+ }
+
+ trr.setSourceMode(tSplit.getSourceMode());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+
+ trr.init();
+
+ return trr;
+ }
+
+
+
+ /* Configuration Section */
+
+ /**
+ * space delimited list of columns
+ */
+ public static final String COLUMN_LIST = "hbase.tablecolumns";
+
+ /**
+ * Use this jobconf param to specify the input table
+ */
+ private static final String INPUT_TABLE = "hbase.inputtable";
+
+ private String startKey = null;
+ private String stopKey = null;
+
+ private SourceMode sourceMode = SourceMode.EMPTY;
+ private TreeSet<String> keyList = null;
+
+ public void configure(JobConf job) {
+ String tableName = getTableName(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte [][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColumns(m_cols);
+
+ try {
+ setHTable(new HTable(HBaseConfiguration.create(job), tableName));
+ } catch (Exception e) {
+ LOG.error( "************* Table could not be created" );
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ LOG.debug("Entered : " + this.getClass() + " : configure()" );
+
+ sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ;
+
+ LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",
+ String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode ));
+
+ switch( sourceMode ) {
+ case SCAN_RANGE:
+ LOG.info("HIT SCAN_RANGE");
+
+ startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) );
+ stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) );
+
+ LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) );
+ break;
+
+ case GET_LIST:
+ LOG.info("HIT GET_LIST");
+
+ Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job)));
+ keyList = new TreeSet<String> (keys);
+
+ LOG.info( "GOT KEY LIST : " + keys );
+ LOG.info(String.format("SETTING key list (%s)", keyList) );
+
+ break;
+
+ case EMPTY:
+ LOG.info("HIT EMPTY");
+
+ sourceMode = sourceMode.SCAN_ALL;
+ break;
+
+ default:
+ LOG.info("HIT DEFAULT");
+
+ break;
+ }
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ String tableName = getTableName(job);
+
+ if (tableName == null) {
+ throw new IOException("expecting one table name");
+ }
+ LOG.debug("".format("Found Table name [%s]", tableName));
+
+
+ // connected to table?
+ if (getHTable() == null) {
+ throw new IOException("could not connect to table '" +
+ tableName + "'");
+ }
+ LOG.debug("".format("Found Table [%s]", getHTable().getTableName()));
+
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ LOG.debug("".format("Found Columns [%s]", colArg));
+
+ LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey));
+
+ if( sourceMode == SourceMode.EMPTY ) {
+ throw new IOException("SourceMode should not be EMPTY");
+ }
+
+ if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) {
+ throw new IOException( "Source mode is GET_LIST bu key list is empty");
+ }
+ }
+
+
+ /* Getters & Setters */
+ private HTable getHTable() { return this.table; }
+ private void setHTable(HTable ht) { this.table = ht; }
+ private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; }
+
+
+ private void setJobProp( JobConf job, String key, String value) {
+ if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key)));
+ job.set(key, value);
+ }
+
+ private String getJobProp( JobConf job, String key ) { return job.get(key); }
+
+ public static void setTableName(JobConf job, String tableName) {
+ // Make sure that table has not been set before
+ String oldTableName = getTableName(job);
+ if(oldTableName != null) {
+ throw new RuntimeException("table name already set to: '"
+ + oldTableName + "'");
+ }
+
+ job.set(INPUT_TABLE, tableName);
+ }
+
+ public static String getTableName(JobConf job) {
+ return job.get(INPUT_TABLE);
+ }
+
+ protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
+ return true;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
new file mode 100644
index 0000000..97077c4
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
@@ -0,0 +1,325 @@
+package parallelai.spyglass.hbase;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+
+public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
+
+ static final Log LOG = LogFactory.getLog(HBaseRecordReader.class);
+
+ private byte [] startRow;
+ private byte [] endRow;
+ private byte [] lastSuccessfulRow;
+ private TreeSet<String> keyList;
+ private SourceMode sourceMode;
+ private Filter trrRowFilter;
+ private ResultScanner scanner;
+ private HTable htable;
+ private byte [][] trrInputColumns;
+ private long timestamp;
+ private int rowcount;
+ private boolean logScannerActivity = false;
+ private int logPerRowCount = 100;
+ private boolean endRowInclusive = true;
+
+ /**
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
+ * @throws IOException
+ */
+ public void restartRangeScan(byte[] firstRow) throws IOException {
+ Scan currentScan;
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ Scan scan = new Scan(firstRow, (endRowInclusive ?
+ Bytes.add(endRow, new byte[] {0}) : endRow ) );
+
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ scan.setCacheBlocks(false);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " +
+ Bytes.toString(firstRow) + ", endRow: " +
+ Bytes.toString(endRow));
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow ));
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ } else {
+ LOG.debug("TIFB.restart, firstRow: " +
+ Bytes.toStringBinary(firstRow) + ", no endRow");
+
+ Scan scan = new Scan(firstRow);
+ TableInputFormat.addColumns(scan, trrInputColumns);
+ scan.setFilter(trrRowFilter);
+ this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ if (logScannerActivity) {
+ LOG.info("Current scan=" + currentScan.toString());
+ timestamp = System.currentTimeMillis();
+ rowcount = 0;
+ }
+ }
+
+ public TreeSet<String> getKeyList() {
+ return keyList;
+ }
+
+ public void setKeyList(TreeSet<String> keyList) {
+ this.keyList = keyList;
+ }
+
+ public SourceMode getSourceMode() {
+ return sourceMode;
+ }
+
+ public void setSourceMode(SourceMode sourceMode) {
+ this.sourceMode = sourceMode;
+ }
+
+ public byte[] getEndRow() {
+ return endRow;
+ }
+
+ public void setEndRowInclusive(boolean isInclusive) {
+ endRowInclusive = isInclusive;
+ }
+
+ public boolean getEndRowInclusive() {
+ return endRowInclusive;
+ }
+
+ private byte [] nextKey = null;
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ switch( sourceMode ) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ restartRangeScan(startRow);
+ break;
+
+ case GET_LIST:
+ nextKey = Bytes.toBytes(keyList.pollFirst());
+ break;
+
+ default:
+ throw new IOException(" Unknown source mode : " + sourceMode );
+ }
+ }
+
+ byte[] getStartRow() {
+ return this.startRow;
+ }
+ /**
+ * @param htable the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(
+ ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns the columns to be placed in {@link Result}.
+ */
+ public void setInputColumns(final byte [][] inputColumns) {
+ this.trrInputColumns = inputColumns;
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte [] startRow) {
+ this.startRow = startRow;
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte [] endRow) {
+ this.endRow = endRow;
+ }
+
+ /**
+ * @param rowFilter the {@link Filter} to be used.
+ */
+ public void setRowFilter(Filter rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ @Override
+ public void close() {
+ if (this.scanner != null) this.scanner.close();
+ }
+
+ /**
+ * @return ImmutableBytesWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ /**
+ * @return RowResult
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ @Override
+ public Result createValue() {
+ return new Result();
+ }
+
+ @Override
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ @Override
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key HStoreKey as input key.
+ * @param value MapWritable as input value
+ * @return true if there was more data
+ * @throws IOException
+ */
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value)
+ throws IOException {
+
+ switch(sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ {
+
+ Result result;
+ try {
+ try {
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount ++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
+ }
+ }
+ } catch (IOException e) {
+ // try to handle all IOExceptions by restarting
+ // the scanner, if the second call fails, it will be rethrown
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation," +
+ " if your mapper has restarted a few other times like this" +
+ " then you should consider killing this job and investigate" +
+ " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restartRangeScan(startRow);
+ } else {
+ restartRangeScan(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
+ }
+
+ if (result != null && result.size() > 0) {
+ key.set(result.getRow());
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ LOG.info(ioe);
+ String lastRow = lastSuccessfulRow == null ?
+ "null" : Bytes.toStringBinary(lastSuccessfulRow);
+ LOG.info("lastSuccessfulRow=" + lastRow);
+ }
+ throw ioe;
+ }
+ }
+
+ case GET_LIST:
+ {
+ Result result;
+ if( nextKey != null ) {
+ result = this.htable.get(new Get(nextKey));
+
+ if (result != null && result.size() > 0) {
+ System.out.println("KeyList => " + keyList);
+ System.out.println("Result => " + result);
+ if (keyList != null || !keyList.isEmpty()) {
+
+ String newKey = keyList.pollFirst();
+ System.out.println("New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey);
+ } else {
+ nextKey = null;
+ }
+ key.set(result.getRow());
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+ }
+
+ default:
+ throw new IOException("Unknown source mode : " + sourceMode );
+ }
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
new file mode 100644
index 0000000..e5acc30
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.hbase;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import cascading.util.Util;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+//import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+/**
+ * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to
+ * allow for the reading and writing of data to and from a HBase cluster.
+ *
+ * @see HBaseTap
+ */
+public class HBaseScheme
+// extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+ extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseScheme.class);
+
+ /** Field keyFields */
+ private Fields keyField;
+
+ /** Long timestamp */
+ private long timeStamp;
+
+ /** String familyNames */
+ private String[] familyNames;
+ /** Field valueFields */
+ private Fields[] valueFields;
+
+ /** String columns */
+ private transient String[] columns;
+ /** Field fields */
+ private transient byte[][] fields;
+
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields of type Fields
+ * @param familyName of type String
+ * @param valueFields of type Fields
+ */
+ public HBaseScheme(Fields keyFields, String familyName, Fields valueFields) {
+ this(keyFields, new String[]{familyName}, Fields.fields(valueFields));
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields of type Fields
+ * @param familyNames of type String[]
+ * @param valueFields of type Fields[]
+ */
+ public HBaseScheme(Fields keyFields, String[] familyNames, Fields[] valueFields) {
+ this.keyField = keyFields;
+ this.familyNames = familyNames;
+ this.valueFields = valueFields;
+ this.timeStamp = System.currentTimeMillis();
+
+ setSourceSink(this.keyField, this.valueFields);
+
+ validate();
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields of type Fields
+ * @param timeStamp of type Long
+ * @param familyNames of type String[]
+ * @param valueFields of type Fields[]
+ */
+ public HBaseScheme(Fields keyFields, long timeStamp, String[] familyNames, Fields[] valueFields) {
+ this.keyField = keyFields;
+ this.timeStamp = timeStamp;
+ this.familyNames = familyNames;
+ this.valueFields = valueFields;
+
+ setSourceSink(this.keyField, this.valueFields);
+
+ validate();
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names
+ *
+ * @param keyField of type String
+ * @param valueFields of type Fields
+ */
+ public HBaseScheme(Fields keyField, Fields valueFields) {
+ this(keyField, Fields.fields(valueFields));
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names
+ *
+ * @param keyField of type Field
+ * @param valueFields of type Field[]
+ */
+ public HBaseScheme(Fields keyField, Fields[] valueFields) {
+ this.keyField = keyField;
+ this.valueFields = valueFields;
+ this.timeStamp = System.currentTimeMillis();
+
+ validate();
+
+ setSourceSink(this.keyField, this.valueFields);
+ }
+
+ private void validate() {
+ if (keyField.size() != 1) {
+ throw new IllegalArgumentException("may only have one key field, found: " + keyField.print());
+ }
+ }
+
+ private void setSourceSink(Fields keyFields, Fields[] columnFields) {
+ Fields allFields = keyFields;
+
+ if (columnFields.length != 0) {
+ allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend
+ }
+
+ setSourceFields(allFields);
+ setSinkFields(allFields);
+ }
+
+ /**
+ * Method getFamilyNames returns the set of familyNames of this HBaseScheme object.
+ *
+ * @return the familyNames (type String[]) of this HBaseScheme object.
+ */
+ public String[] getFamilyNames() {
+ HashSet<String> familyNameSet = new HashSet<String>();
+
+ if (familyNames == null) {
+ for (String columnName : columns(null, this.valueFields)) {
+ int pos = columnName.indexOf(":");
+ familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName));
+ }
+ } else {
+ for (String familyName : familyNames) {
+ familyNameSet.add(familyName);
+ }
+ }
+ return familyNameSet.toArray(new String[0]);
+ }
+
+ @Override
+ public void sourcePrepare(FlowProcess<JobConf> flowProcess,
+ SourceCall<Object[], RecordReader> sourceCall) {
+ Object[] pair =
+ new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()};
+
+ sourceCall.setContext(pair);
+ }
+
+ @Override
+ public void sourceCleanup(FlowProcess<JobConf> flowProcess,
+ SourceCall<Object[], RecordReader> sourceCall) {
+ sourceCall.setContext(null);
+ }
+
+ @Override
+ public boolean source(FlowProcess<JobConf> flowProcess,
+ SourceCall<Object[], RecordReader> sourceCall) throws IOException {
+ Tuple result = new Tuple();
+
+ Object key = sourceCall.getContext()[0];
+ Object value = sourceCall.getContext()[1];
+ boolean hasNext = sourceCall.getInput().next(key, value);
+ if (!hasNext) { return false; }
+
+ // Skip nulls
+ if (key == null || value == null) { return true; }
+
+ ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+ Result row = (Result) value;
+ result.add(keyWritable);
+
+ for (int i = 0; i < this.familyNames.length; i++) {
+ String familyName = this.familyNames[i];
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+ Fields fields = this.valueFields[i];
+ for (int k = 0; k < fields.size(); k++) {
+ String fieldName = (String) fields.get(k);
+ byte[] fieldNameBytes = Bytes.toBytes(fieldName);
+ byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes);
+ if (cellValue == null) {
+ cellValue = new byte[0];
+ }
+ result.add(new ImmutableBytesWritable(cellValue));
+ }
+ }
+
+ sourceCall.getIncomingEntry().setTuple(result);
+
+ return true;
+ }
+
+ @Override
+ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
+ throws IOException {
+ TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+ OutputCollector outputCollector = sinkCall.getOutput();
+ Tuple key = tupleEntry.selectTuple(keyField);
+ ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0);
+ Put put = new Put(keyBytes.get(), this.timeStamp);
+
+ for (int i = 0; i < valueFields.length; i++) {
+ Fields fieldSelector = valueFields[i];
+ TupleEntry values = tupleEntry.selectEntry(fieldSelector);
+
+ for (int j = 0; j < values.getFields().size(); j++) {
+ Fields fields = values.getFields();
+ Tuple tuple = values.getTuple();
+
+ ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
+ put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
+ }
+ }
+
+ outputCollector.collect(null, put);
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+ conf.setOutputFormat(TableOutputFormat.class);
+
+ conf.setOutputKeyClass(ImmutableBytesWritable.class);
+ conf.setOutputValueClass(Put.class);
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+ conf.setInputFormat(HBaseInputFormat.class);
+
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
+ conf.set(HBaseInputFormat.COLUMN_LIST, columns);
+ }
+
+ private String getColumns() {
+ return Util.join(columns(this.familyNames, this.valueFields), " ");
+ }
+
+ private String[] columns(String[] familyNames, Fields[] fieldsArray) {
+ if (columns != null) { return columns; }
+
+ int size = 0;
+
+ for (Fields fields : fieldsArray) { size += fields.size(); }
+
+ columns = new String[size];
+
+ int count = 0;
+
+ for (int i = 0; i < fieldsArray.length; i++) {
+ Fields fields = fieldsArray[i];
+
+ for (int j = 0; j < fields.size(); j++) {
+ if (familyNames == null) { columns[count++] = hbaseColumn((String) fields.get(j)); } else {
+ columns[count++] = hbaseColumn(familyNames[i]) + (String) fields.get(j);
+ }
+ }
+ }
+
+ return columns;
+ }
+
+ private String hbaseColumn(String column) {
+ if (column.indexOf(":") < 0) { return column + ":"; }
+
+ return column;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) { return true; }
+ if (object == null || getClass() != object.getClass()) { return false; }
+ if (!super.equals(object)) { return false; }
+
+ HBaseScheme that = (HBaseScheme) object;
+
+ if (!Arrays.equals(familyNames, that.familyNames)) { return false; }
+ if (keyField != null ? !keyField.equals(that.keyField) : that.keyField != null) {
+ return false;
+ }
+ if (!Arrays.equals(valueFields, that.valueFields)) { return false; }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (keyField != null ? keyField.hashCode() : 0);
+ result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
+ result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0);
+ return result;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
new file mode 100644
index 0000000..1d48e1d
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java
@@ -0,0 +1,190 @@
+package parallelai.spyglass.hbase;
+
+import java.awt.event.KeyListener;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+
+public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable {
+
+ private final Log LOG = LogFactory.getLog(HBaseTableSplit.class);
+
+ private byte [] m_tableName = null;
+ private byte [] m_startRow = null;
+ private byte [] m_endRow = null;
+ private String m_regionLocation = null;
+ private TreeSet<String> m_keyList = null;
+ private SourceMode m_sourceMode = SourceMode.EMPTY;
+ private boolean m_endRowInclusive = true;
+
+ /** default constructor */
+ public HBaseTableSplit() {
+ this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY);
+ }
+
+ /**
+ * Constructor
+ * @param tableName
+ * @param startRow
+ * @param endRow
+ * @param location
+ */
+ public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow,
+ final String location, final SourceMode sourceMode) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_regionLocation = location;
+ this.m_sourceMode = sourceMode;
+ }
+
+ public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) {
+ this.m_tableName = tableName;
+ this.m_keyList = keyList;
+ this.m_sourceMode = sourceMode;
+ this.m_regionLocation = location;
+ }
+
+ /** @return table name */
+ public byte [] getTableName() {
+ return this.m_tableName;
+ }
+
+ /** @return starting row key */
+ public byte [] getStartRow() {
+ return this.m_startRow;
+ }
+
+ /** @return end row key */
+ public byte [] getEndRow() {
+ return this.m_endRow;
+ }
+
+ public boolean getEndRowInclusive() {
+ return m_endRowInclusive;
+ }
+
+ public void setEndRowInclusive(boolean isInclusive) {
+ m_endRowInclusive = isInclusive;
+ }
+
+ /** @return list of keys to get */
+ public TreeSet<String> getKeyList() {
+ return m_keyList;
+ }
+
+ /** @return get the source mode */
+ public SourceMode getSourceMode() {
+ return m_sourceMode;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ LOG.info("REGION GETTER : " + m_regionLocation);
+
+ return this.m_regionLocation;
+ }
+
+ public String[] getLocations() {
+ LOG.info("REGION ARRAY : " + m_regionLocation);
+
+ return new String[] {this.m_regionLocation};
+ }
+
+ @Override
+ public long getLength() {
+ // Not clear how to obtain this... seems to be used only for sorting splits
+ return 0;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ LOG.info("READ ME : " + in.toString());
+
+ this.m_tableName = Bytes.readByteArray(in);
+ this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+ this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in)));
+
+ switch(this.m_sourceMode) {
+ case SCAN_RANGE:
+ this.m_startRow = Bytes.readByteArray(in);
+ this.m_endRow = Bytes.readByteArray(in);
+ this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in));
+ break;
+
+ case GET_LIST:
+ this.m_keyList = new TreeSet<String>();
+
+ int m = Bytes.toInt(Bytes.readByteArray(in));
+
+ for( int i = 0; i < m; i++) {
+ this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in)));
+ }
+ break;
+ }
+
+ LOG.info("READ and CREATED : " + this);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ LOG.info("WRITE : " + this);
+
+ Bytes.writeByteArray(out, this.m_tableName);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name()));
+
+ switch( this.m_sourceMode ) {
+ case SCAN_RANGE:
+ Bytes.writeByteArray(out, this.m_startRow);
+ Bytes.writeByteArray(out, this.m_endRow);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive));
+ break;
+
+ case GET_LIST:
+ Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size()));
+
+ for( String k: this.m_keyList ) {
+ Bytes.writeByteArray(out, Bytes.toBytes(k));
+ }
+ break;
+ }
+
+ LOG.info("WROTE : " + out.toString());
+ }
+
+ @Override
+ public String toString() {
+ return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)",
+ Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList);
+ }
+
+ @Override
+ public int compareTo(HBaseTableSplit o) {
+ switch(m_sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ return Bytes.compareTo(getStartRow(), o.getStartRow());
+
+ case GET_LIST:
+ return m_keyList.equals( o.getKeyList() ) ? 0 : -1;
+
+ default:
+ return -1;
+ }
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
new file mode 100644
index 0000000..9a0ed0e
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -0,0 +1,360 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.hbase;
+
+import cascading.flow.FlowProcess;
+import cascading.tap.SinkMode;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
+import cascading.tuple.TupleEntryCollector;
+import cascading.tuple.TupleEntryIterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+/**
+ * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with
+ * the {@HBaseFullScheme} to allow for the reading and writing
+ * of data to and from a HBase cluster.
+ */
+public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ /** Field SCHEME */
+ public static final String SCHEME = "hbase";
+
+ /** Field hBaseAdmin */
+ private transient HBaseAdmin hBaseAdmin;
+
+ /** Field hostName */
+ private String quorumNames;
+ /** Field tableName */
+ private String tableName;
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) {
+ super(HBaseFullScheme, SinkMode.UPDATE);
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme) {
+ super(HBaseFullScheme, SinkMode.UPDATE);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Method getTableName returns the tableName of this HBaseTap object.
+ *
+ * @return the tableName (type String) of this HBaseTap object.
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Path getPath() {
+ return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
+ }
+
+ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
+ if (hBaseAdmin == null) {
+ Configuration hbaseConf = HBaseConfiguration.create(conf);
+ hBaseAdmin = new HBaseAdmin(hbaseConf);
+ }
+
+ return hBaseAdmin;
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
+ if(quorumNames != null) {
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+ }
+
+ LOG.debug("sinking to table: {}", tableName);
+
+ if (isReplace() && conf.get("mapred.task.partition") == null) {
+ try {
+ deleteResource(conf);
+
+ } catch (IOException e) {
+ throw new RuntimeException("could not delete resource: " + e);
+ }
+ }
+
+ else if (isUpdate()) {
+ try {
+ createResource(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(tableName + " does not exist !", e);
+ }
+
+ }
+
+ conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ super.sinkConfInit(process, conf);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException {
+ return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
+ }
+
+ @Override
+ public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
+ HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this );
+ hBaseCollector.prepare();
+ return hBaseCollector;
+ }
+
+ @Override
+ public boolean createResource(JobConf jobConf) throws IOException {
+ HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
+
+ if (hBaseAdmin.tableExists(tableName)) {
+ return true;
+ }
+
+ LOG.info("creating hbase table: {}", tableName);
+
+ HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
+
+ String[] familyNames = ((HBaseScheme) getScheme()).getFamilyNames();
+
+ for (String familyName : familyNames) {
+ tableDescriptor.addFamily(new HColumnDescriptor(familyName));
+ }
+
+ hBaseAdmin.createTable(tableDescriptor);
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(JobConf jobConf) throws IOException {
+ // TODO: for now we don't do anything just to be safe
+ return true;
+ }
+
+ @Override
+ public boolean resourceExists(JobConf jobConf) throws IOException {
+ return getHBaseAdmin(jobConf).tableExists(tableName);
+ }
+
+ @Override
+ public long getModifiedTime(JobConf jobConf) throws IOException {
+ return System.currentTimeMillis(); // currently unable to find last mod time
+ // on a table
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
+ // a hack for MultiInputFormat to see that there is a child format
+ FileInputFormat.setInputPaths( conf, getPath() );
+
+ if(quorumNames != null) {
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+ }
+
+ LOG.debug("sourcing from table: {}", tableName);
+
+ // TODO: Make this a bit smarter to store table name per flow.
+// process.getID();
+//
+// super.getFullIdentifier(conf);
+
+ HBaseInputFormat.setTableName(conf, tableName);
+
+ for( SourceConfig sc : sourceConfigList) {
+ sc.configure(conf);
+ }
+
+ super.sourceConfInit(process, conf);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ HBaseTap hBaseTap = (HBaseTap) object;
+
+ if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+ return result;
+ }
+
+ private static class SourceConfig implements Serializable {
+ public String tableName = null;
+ public SourceMode sourceMode = SourceMode.SCAN_ALL;
+ public String startKey = null;
+ public String stopKey = null;
+ public String [] keyList = null;
+
+ public void configure(Configuration jobConf) {
+ switch( sourceMode ) {
+ case SCAN_RANGE:
+ jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
+
+ if( startKey != null && startKey.length() > 0 )
+ jobConf.set( String.format(HBaseConstants.START_KEY, tableName), startKey);
+
+ if( stopKey != null && stopKey.length() > 0 )
+ jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey);
+
+ LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey));
+ LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey));
+ break;
+
+ case GET_LIST:
+ jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
+ jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList);
+
+ LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList));
+ break;
+
+ default:
+ jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString());
+
+ LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()));
+ break;
+ }
+ }
+ }
+
+ private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>();
+
+ public void setHBaseRangeParms(String startKey, String stopKey ) {
+ SourceConfig sc = new SourceConfig();
+
+ sc.sourceMode = SourceMode.SCAN_RANGE;
+ sc.tableName = tableName;
+ sc.startKey = startKey;
+ sc.stopKey = stopKey;
+
+ sourceConfigList.add(sc);
+ }
+
+ public void setHBaseListParms(String [] keyList ) {
+ SourceConfig sc = new SourceConfig();
+
+ sc.sourceMode = SourceMode.GET_LIST;
+ sc.tableName = tableName;
+ sc.keyList = keyList;
+
+ sourceConfigList.add(sc);
+ }
+
+ public void setHBaseScanAllParms() {
+ SourceConfig sc = new SourceConfig();
+
+ sc.sourceMode = SourceMode.SCAN_ALL;
+ sc.tableName = tableName;
+
+ sourceConfigList.add(sc);
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java
new file mode 100644
index 0000000..098f957
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.hbase;
+
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowProcess;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tuple.TupleEntrySchemeCollector;
+import org.apache.hadoop.mapred.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Class HBaseTapCollector is a kind of
+ * {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the
+ * resource managed by a particular {@link HBaseTap} instance.
+ */
+public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class);
+ /** Field conf */
+ private final JobConf conf;
+ /** Field writer */
+ private RecordWriter writer;
+ /** Field flowProcess */
+ private final FlowProcess<JobConf> hadoopFlowProcess;
+ /** Field tap */
+ private final Tap<JobConf, RecordReader, OutputCollector> tap;
+ /** Field reporter */
+ private final Reporter reporter = Reporter.NULL;
+
+ /**
+ * Constructor TapCollector creates a new TapCollector instance.
+ *
+ * @param flowProcess
+ * @param tap
+ * of type Tap
+ * @throws IOException
+ * when fails to initialize
+ */
+ public HBaseTapCollector(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException {
+ super(flowProcess, tap.getScheme());
+ this.hadoopFlowProcess = flowProcess;
+ this.tap = tap;
+ this.conf = new JobConf(flowProcess.getConfigCopy());
+ this.setOutput(this);
+ }
+
+ @Override
+ public void prepare() {
+ try {
+ initialize();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ super.prepare();
+ }
+
+ private void initialize() throws IOException {
+ tap.sinkConfInit(hadoopFlowProcess, conf);
+ OutputFormat outputFormat = conf.getOutputFormat();
+ LOG.info("Output format class is: " + outputFormat.getClass().toString());
+ writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);
+ sinkCall.setOutput(this);
+ }
+
+ @Override
+ public void close() {
+ try {
+ LOG.info("closing tap collector for: {}", tap);
+ writer.close(reporter);
+ } catch (IOException exception) {
+ LOG.warn("exception closing: {}", exception);
+ throw new TapException("exception closing HBaseTapCollector", exception);
+ } finally {
+ super.close();
+ }
+ }
+
+ /**
+ * Method collect writes the given values to the {@link Tap} this instance
+ * encapsulates.
+ *
+ * @param writableComparable
+ * of type WritableComparable
+ * @param writable
+ * of type Writable
+ * @throws IOException
+ * when
+ */
+ public void collect(Object writableComparable, Object writable) throws IOException {
+ if (hadoopFlowProcess instanceof HadoopFlowProcess)
+ ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress();
+
+ writer.write(writableComparable, writable);
+ }
+}