diff options
Diffstat (limited to 'src/main/java')
8 files changed, 1948 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java new file mode 100644 index 0000000..af5bdf4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java @@ -0,0 +1,70 @@ +package parallelai.spyglass.base; + +import org.apache.log4j.Logger; +import org.apache.log4j.LogManager; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +public class JobLibLoader { + + private static Logger logger = LogManager.getLogger(JobLibLoader.class); + + public static void loadJars(String libPathStr, Configuration config) { + + + try { + Path libPath = new Path(libPathStr); + + FileSystem fs = FileSystem.get(config); + + RemoteIterator<LocatedFileStatus> itr = fs.listFiles(libPath, true); + + while (itr.hasNext()) { + LocatedFileStatus f = itr.next(); + + if (!f.isDirectory() && f.getPath().getName().endsWith("jar")) { + logger.info("Loading Jar : " + f.getPath().getName()); + DistributedCache.addFileToClassPath(f.getPath(), config); + } + } + } catch (Exception e) { + e.printStackTrace(); + logger.error(e.toString()); + } + } + + public static void addFiletoCache(String libPathStr, Configuration config) { + + try { + Path filePath = new Path(libPathStr); + DistributedCache.addCacheFile(filePath.toUri(), config); + // DistributedCache.createSymlink(config); + + // config.set("mapred.cache.files", libPathStr); + // config.set("mapred.create.symlink", "yes"); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static Path[] getFileFromCache(String libPathStr, + Configuration config) { + Path[] localFiles = null; + try { + logger.info("Local Cache => " + DistributedCache.getLocalCacheFiles(config)); + logger.info("Hadoop Cache => "+ DistributedCache.getCacheFiles(config)); + if (DistributedCache.getLocalCacheFiles(config) != null) { + localFiles = DistributedCache.getLocalCacheFiles(config); + } + logger.info("LocalFiles => " + localFiles); + } catch (Exception e) { + e.printStackTrace(); + } + return localFiles; + } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java new file mode 100644 index 0000000..b546107 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -0,0 +1,19 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.conf.Configuration; + +public class HBaseConstants { + + public enum SourceMode { + EMPTY, + SCAN_ALL, + SCAN_RANGE, + GET_LIST; + } + + public static final String START_KEY = "hbase.%s.startkey"; + public static final String STOP_KEY = "hbase.%s.stopkey"; + public static final String SOURCE_MODE = "hbase.%s.source.mode"; + public static final String KEY_LIST = "hbase.%s.key.list"; + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java new file mode 100644 index 0000000..f1f4fb7 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -0,0 +1,531 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseInputFormat + implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { + + private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); + + private final String id = UUID.randomUUID().toString(); + + private byte [][] inputColumns; + private HTable table; + private HBaseRecordReader tableRecordReader; + private Filter rowFilter; + private String tableName = ""; + + private HashMap<InetAddress, String> reverseDNSCacheMap = + new HashMap<InetAddress, String>(); + + private String nameServer = null; + +// private Scan scan = null; + + + @SuppressWarnings("deprecation") + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + if (this.table == null) { + throw new IOException("No table was provided"); + } + + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + + Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); + HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); + splits.add(split); + + return splits.toArray(new HBaseTableSplit[splits.size()]); + } + + if( keys.getSecond() == null || keys.getSecond().length == 0) { + throw new IOException("Expecting at least one region."); + } + + if( keys.getFirst().length != keys.getSecond().length ) { + throw new IOException("Regions for start and end key do not match"); + } + + byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; + byte[] maxKey = keys.getSecond()[0]; + + LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + + byte [][] regStartKeys = keys.getFirst(); + byte [][] regStopKeys = keys.getSecond(); + String [] regions = new String[regStartKeys.length]; + + for( int i = 0; i < regStartKeys.length; i++ ) { + minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey; + maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey; + + HServerAddress regionServerAddress = + table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); + InetAddress regionAddress = + regionServerAddress.getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + +// HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +// InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); +// +// String regionLocation; +// +// try { +// regionLocation = reverseDNS(regionAddress); +// } catch (NamingException e) { +// LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); +// regionLocation = regionServerAddress.getHostname(); +// } + +// String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname(); + + LOG.debug( "***** " + regionLocation ); + + if( regionLocation == null || regionLocation.length() == 0 ) + throw new IOException( "The region info for regiosn " + i + " is null or empty"); + + regions[i] = regionLocation; + + LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); + } + + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + + LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + + LOG.info("SOURCE MODE is : " + sourceMode); + + switch( sourceMode ) { + case SCAN_ALL: + startRow = HConstants.EMPTY_START_ROW; + stopRow = HConstants.EMPTY_END_ROW; + + LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + + case SCAN_RANGE: + startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; + stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; + + LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); + break; + } + + switch( sourceMode ) { + case EMPTY: + case SCAN_ALL: + case SCAN_RANGE: + { +// startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow; +// stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow; + + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); + + int maxRegions = validRegions.size(); + int currentRegion = 1; + + for( HRegionLocation cRegion : validRegions ) { + byte [] rStart = cRegion.getRegionInfo().getStartKey(); + byte [] rStop = cRegion.getRegionInfo().getEndKey(); + + HServerAddress regionServerAddress = cRegion.getServerAddress(); + InetAddress regionAddress = + regionServerAddress.getInetSocketAddress().getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.error("Cannot resolve the host name for " + regionAddress + + " because of " + e); + regionLocation = regionServerAddress.getHostname(); + } + + byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); + byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow); + + LOG.info("".format("BOOL start (%s) stop (%s) length (%d)", + (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), + (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), + rStop.length + )); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), + sStart, + sStop, + regionLocation, + SourceMode.SCAN_RANGE + ); + + split.setEndRowInclusive( currentRegion == maxRegions ); + + currentRegion ++; + + LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]", + Bytes.toString(startRow), Bytes.toString(stopRow), + Bytes.toString(rStart), Bytes.toString(rStop), + Bytes.toString(sStart), + Bytes.toString(sStop), + cRegion.getHostnamePort(), split) ); + + splits.add(split); + } + +// +// for (int i = 0; i < keys.getFirst().length; i++) { +// +// if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +// LOG.info("NOT including regions : " + regions[i]); +// continue; +// } +// +// // determine if the given start an stop key fall into the region +// if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || +// Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && +// (stopRow.length == 0 || +// Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { +// +// byte[] splitStart = startRow.length == 0 || +// Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? +// keys.getFirst()[i] : startRow; +// byte[] splitStop = (stopRow.length == 0 || +// Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && +// keys.getSecond()[i].length > 0 ? +// keys.getSecond()[i] : stopRow; +// HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +// splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); +// splits.add(split); +// +// LOG.info("getSplits: split -> " + i + " -> " + split); +// } +// } + + LOG.info("RETURNED SPLITS: split -> " + splits); + + return splits.toArray(new HBaseTableSplit[splits.size()]); + } + + case GET_LIST: + { + if( keyList == null || keyList.size() == 0 ) { + throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); + } + + List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); + + for (int i = 0; i < keys.getFirst().length; i++) { + + if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + + LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); + + Set<String> regionsSubSet = null; + + if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { + LOG.info("REGION start is empty"); + LOG.info("REGION stop is empty"); + regionsSubSet = keyList; + } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { + LOG.info("REGION start is empty"); + regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); + } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { + LOG.info("REGION stop is empty"); + regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); + } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { + regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); + } else { + throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)", + regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); + } + + if( regionsSubSet == null || regionsSubSet.size() == 0) { + LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); + + continue; + } + + TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); + + LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); + + HBaseTableSplit split = new HBaseTableSplit( + table.getTableName(), regionKeyList, + regions[i], + SourceMode.GET_LIST); + splits.add(split); + } + + return splits.toArray(new HBaseTableSplit[splits.size()]); + } + + default: + throw new IOException("Unknown source Mode : " + sourceMode ); + } + } + + private String reverseDNS(InetAddress ipAddress) throws NamingException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); + this.reverseDNSCacheMap.put(ipAddress, hostName); + } + return hostName; + } + + + @Override + public RecordReader<ImmutableBytesWritable, Result> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + + if( ! (split instanceof HBaseTableSplit ) ) + throw new IOException("Table Split is not type HBaseTableSplit"); + + HBaseTableSplit tSplit = (HBaseTableSplit) split; + + HBaseRecordReader trr = new HBaseRecordReader(); + + switch( tSplit.getSourceMode() ) { + case SCAN_ALL: + case SCAN_RANGE: + { + LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); + + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setEndRowInclusive(tSplit.getEndRowInclusive()); + } + + break; + + case GET_LIST: + { + LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); + + trr.setKeyList(tSplit.getKeyList()); + } + + break; + + default: + throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() ); + } + + trr.setSourceMode(tSplit.getSourceMode()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + + trr.init(); + + return trr; + } + + + + /* Configuration Section */ + + /** + * space delimited list of columns + */ + public static final String COLUMN_LIST = "hbase.tablecolumns"; + + /** + * Use this jobconf param to specify the input table + */ + private static final String INPUT_TABLE = "hbase.inputtable"; + + private String startKey = null; + private String stopKey = null; + + private SourceMode sourceMode = SourceMode.EMPTY; + private TreeSet<String> keyList = null; + + public void configure(JobConf job) { + String tableName = getTableName(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte [][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColumns(m_cols); + + try { + setHTable(new HTable(HBaseConfiguration.create(job), tableName)); + } catch (Exception e) { + LOG.error( "************* Table could not be created" ); + LOG.error(StringUtils.stringifyException(e)); + } + + LOG.debug("Entered : " + this.getClass() + " : configure()" ); + + sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; + + LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally", + String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); + + switch( sourceMode ) { + case SCAN_RANGE: + LOG.info("HIT SCAN_RANGE"); + + startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) ); + stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) ); + + LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) ); + break; + + case GET_LIST: + LOG.info("HIT GET_LIST"); + + Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); + keyList = new TreeSet<String> (keys); + + LOG.info( "GOT KEY LIST : " + keys ); + LOG.info(String.format("SETTING key list (%s)", keyList) ); + + break; + + case EMPTY: + LOG.info("HIT EMPTY"); + + sourceMode = sourceMode.SCAN_ALL; + break; + + default: + LOG.info("HIT DEFAULT"); + + break; + } + } + + public void validateInput(JobConf job) throws IOException { + // expecting exactly one path + String tableName = getTableName(job); + + if (tableName == null) { + throw new IOException("expecting one table name"); + } + LOG.debug("".format("Found Table name [%s]", tableName)); + + + // connected to table? + if (getHTable() == null) { + throw new IOException("could not connect to table '" + + tableName + "'"); + } + LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + LOG.debug("".format("Found Columns [%s]", colArg)); + + LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); + + if( sourceMode == SourceMode.EMPTY ) { + throw new IOException("SourceMode should not be EMPTY"); + } + + if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) { + throw new IOException( "Source mode is GET_LIST bu key list is empty"); + } + } + + + /* Getters & Setters */ + private HTable getHTable() { return this.table; } + private void setHTable(HTable ht) { this.table = ht; } + private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; } + + + private void setJobProp( JobConf job, String key, String value) { + if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); + job.set(key, value); + } + + private String getJobProp( JobConf job, String key ) { return job.get(key); } + + public static void setTableName(JobConf job, String tableName) { + // Make sure that table has not been set before + String oldTableName = getTableName(job); + if(oldTableName != null) { + throw new RuntimeException("table name already set to: '" + + oldTableName + "'"); + } + + job.set(INPUT_TABLE, tableName); + } + + public static String getTableName(JobConf job) { + return job.get(INPUT_TABLE); + } + + protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { + return true; + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java new file mode 100644 index 0000000..97077c4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -0,0 +1,325 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { + + static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); + + private byte [] startRow; + private byte [] endRow; + private byte [] lastSuccessfulRow; + private TreeSet<String> keyList; + private SourceMode sourceMode; + private Filter trrRowFilter; + private ResultScanner scanner; + private HTable htable; + private byte [][] trrInputColumns; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; + private boolean endRowInclusive = true; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restartRangeScan(byte[] firstRow) throws IOException { + Scan currentScan; + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, (endRowInclusive ? + Bytes.add(endRow, new byte[] {0}) : endRow ) ); + + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + scan.setCacheBlocks(false); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toString(firstRow) + ", endRow: " + + Bytes.toString(endRow)); + Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); + TableInputFormat.addColumns(scan, trrInputColumns); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", no endRow"); + + Scan scan = new Scan(firstRow); + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + if (logScannerActivity) { + LOG.info("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } + } + + public TreeSet<String> getKeyList() { + return keyList; + } + + public void setKeyList(TreeSet<String> keyList) { + this.keyList = keyList; + } + + public SourceMode getSourceMode() { + return sourceMode; + } + + public void setSourceMode(SourceMode sourceMode) { + this.sourceMode = sourceMode; + } + + public byte[] getEndRow() { + return endRow; + } + + public void setEndRowInclusive(boolean isInclusive) { + endRowInclusive = isInclusive; + } + + public boolean getEndRowInclusive() { + return endRowInclusive; + } + + private byte [] nextKey = null; + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + switch( sourceMode ) { + case SCAN_ALL: + case SCAN_RANGE: + restartRangeScan(startRow); + break; + + case GET_LIST: + nextKey = Bytes.toBytes(keyList.pollFirst()); + break; + + default: + throw new IOException(" Unknown source mode : " + sourceMode ); + } + } + + byte[] getStartRow() { + return this.startRow; + } + /** + * @param htable the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean( + ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * @param inputColumns the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + @Override + public void close() { + if (this.scanner != null) this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + @Override + public Result createValue() { + return new Result(); + } + + @Override + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + @Override + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + @Override + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + + switch(sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + { + + Result result; + try { + try { + result = this.scanner.next(); + if (logScannerActivity) { + rowcount ++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restartRangeScan(startRow); + } else { + restartRangeScan(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + key.set(result.getRow()); + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + LOG.info(ioe); + String lastRow = lastSuccessfulRow == null ? + "null" : Bytes.toStringBinary(lastSuccessfulRow); + LOG.info("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } + } + + case GET_LIST: + { + Result result; + if( nextKey != null ) { + result = this.htable.get(new Get(nextKey)); + + if (result != null && result.size() > 0) { + System.out.println("KeyList => " + keyList); + System.out.println("Result => " + result); + if (keyList != null || !keyList.isEmpty()) { + + String newKey = keyList.pollFirst(); + System.out.println("New Key => " + newKey); + nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); + } else { + nextKey = null; + } + key.set(result.getRow()); + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } else { + return false; + } + } + + default: + throw new IOException("Unknown source mode : " + sourceMode ); + } + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java new file mode 100644 index 0000000..e5acc30 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -0,0 +1,336 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +//import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +/** + * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to + * allow for the reading and writing of data to and from a HBase cluster. + * + * @see HBaseTap + */ +public class HBaseScheme +// extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { + extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseScheme.class); + + /** Field keyFields */ + private Fields keyField; + + /** Long timestamp */ + private long timeStamp; + + /** String familyNames */ + private String[] familyNames; + /** Field valueFields */ + private Fields[] valueFields; + + /** String columns */ + private transient String[] columns; + /** Field fields */ + private transient byte[][] fields; + + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance. + * + * @param keyFields of type Fields + * @param familyName of type String + * @param valueFields of type Fields + */ + public HBaseScheme(Fields keyFields, String familyName, Fields valueFields) { + this(keyFields, new String[]{familyName}, Fields.fields(valueFields)); + } + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance. + * + * @param keyFields of type Fields + * @param familyNames of type String[] + * @param valueFields of type Fields[] + */ + public HBaseScheme(Fields keyFields, String[] familyNames, Fields[] valueFields) { + this.keyField = keyFields; + this.familyNames = familyNames; + this.valueFields = valueFields; + this.timeStamp = System.currentTimeMillis(); + + setSourceSink(this.keyField, this.valueFields); + + validate(); + } + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance. + * + * @param keyFields of type Fields + * @param timeStamp of type Long + * @param familyNames of type String[] + * @param valueFields of type Fields[] + */ + public HBaseScheme(Fields keyFields, long timeStamp, String[] familyNames, Fields[] valueFields) { + this.keyField = keyFields; + this.timeStamp = timeStamp; + this.familyNames = familyNames; + this.valueFields = valueFields; + + setSourceSink(this.keyField, this.valueFields); + + validate(); + } + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names + * + * @param keyField of type String + * @param valueFields of type Fields + */ + public HBaseScheme(Fields keyField, Fields valueFields) { + this(keyField, Fields.fields(valueFields)); + } + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names + * + * @param keyField of type Field + * @param valueFields of type Field[] + */ + public HBaseScheme(Fields keyField, Fields[] valueFields) { + this.keyField = keyField; + this.valueFields = valueFields; + this.timeStamp = System.currentTimeMillis(); + + validate(); + + setSourceSink(this.keyField, this.valueFields); + } + + private void validate() { + if (keyField.size() != 1) { + throw new IllegalArgumentException("may only have one key field, found: " + keyField.print()); + } + } + + private void setSourceSink(Fields keyFields, Fields[] columnFields) { + Fields allFields = keyFields; + + if (columnFields.length != 0) { + allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend + } + + setSourceFields(allFields); + setSinkFields(allFields); + } + + /** + * Method getFamilyNames returns the set of familyNames of this HBaseScheme object. + * + * @return the familyNames (type String[]) of this HBaseScheme object. + */ + public String[] getFamilyNames() { + HashSet<String> familyNameSet = new HashSet<String>(); + + if (familyNames == null) { + for (String columnName : columns(null, this.valueFields)) { + int pos = columnName.indexOf(":"); + familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName)); + } + } else { + for (String familyName : familyNames) { + familyNameSet.add(familyName); + } + } + return familyNameSet.toArray(new String[0]); + } + + @Override + public void sourcePrepare(FlowProcess<JobConf> flowProcess, + SourceCall<Object[], RecordReader> sourceCall) { + Object[] pair = + new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + + sourceCall.setContext(pair); + } + + @Override + public void sourceCleanup(FlowProcess<JobConf> flowProcess, + SourceCall<Object[], RecordReader> sourceCall) { + sourceCall.setContext(null); + } + + @Override + public boolean source(FlowProcess<JobConf> flowProcess, + SourceCall<Object[], RecordReader> sourceCall) throws IOException { + Tuple result = new Tuple(); + + Object key = sourceCall.getContext()[0]; + Object value = sourceCall.getContext()[1]; + boolean hasNext = sourceCall.getInput().next(key, value); + if (!hasNext) { return false; } + + // Skip nulls + if (key == null || value == null) { return true; } + + ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; + Result row = (Result) value; + result.add(keyWritable); + + for (int i = 0; i < this.familyNames.length; i++) { + String familyName = this.familyNames[i]; + byte[] familyNameBytes = Bytes.toBytes(familyName); + Fields fields = this.valueFields[i]; + for (int k = 0; k < fields.size(); k++) { + String fieldName = (String) fields.get(k); + byte[] fieldNameBytes = Bytes.toBytes(fieldName); + byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes); + if (cellValue == null) { + cellValue = new byte[0]; + } + result.add(new ImmutableBytesWritable(cellValue)); + } + } + + sourceCall.getIncomingEntry().setTuple(result); + + return true; + } + + @Override + public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) + throws IOException { + TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); + OutputCollector outputCollector = sinkCall.getOutput(); + Tuple key = tupleEntry.selectTuple(keyField); + ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); + Put put = new Put(keyBytes.get(), this.timeStamp); + + for (int i = 0; i < valueFields.length; i++) { + Fields fieldSelector = valueFields[i]; + TupleEntry values = tupleEntry.selectEntry(fieldSelector); + + for (int j = 0; j < values.getFields().size(); j++) { + Fields fields = values.getFields(); + Tuple tuple = values.getTuple(); + + ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j); + put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get()); + } + } + + outputCollector.collect(null, put); + } + + @Override + public void sinkConfInit(FlowProcess<JobConf> process, + Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { + conf.setOutputFormat(TableOutputFormat.class); + + conf.setOutputKeyClass(ImmutableBytesWritable.class); + conf.setOutputValueClass(Put.class); + } + + @Override + public void sourceConfInit(FlowProcess<JobConf> process, + Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { + conf.setInputFormat(HBaseInputFormat.class); + + String columns = getColumns(); + LOG.debug("sourcing from columns: {}", columns); + conf.set(HBaseInputFormat.COLUMN_LIST, columns); + } + + private String getColumns() { + return Util.join(columns(this.familyNames, this.valueFields), " "); + } + + private String[] columns(String[] familyNames, Fields[] fieldsArray) { + if (columns != null) { return columns; } + + int size = 0; + + for (Fields fields : fieldsArray) { size += fields.size(); } + + columns = new String[size]; + + int count = 0; + + for (int i = 0; i < fieldsArray.length; i++) { + Fields fields = fieldsArray[i]; + + for (int j = 0; j < fields.size(); j++) { + if (familyNames == null) { columns[count++] = hbaseColumn((String) fields.get(j)); } else { + columns[count++] = hbaseColumn(familyNames[i]) + (String) fields.get(j); + } + } + } + + return columns; + } + + private String hbaseColumn(String column) { + if (column.indexOf(":") < 0) { return column + ":"; } + + return column; + } + + @Override + public boolean equals(Object object) { + if (this == object) { return true; } + if (object == null || getClass() != object.getClass()) { return false; } + if (!super.equals(object)) { return false; } + + HBaseScheme that = (HBaseScheme) object; + + if (!Arrays.equals(familyNames, that.familyNames)) { return false; } + if (keyField != null ? !keyField.equals(that.keyField) : that.keyField != null) { + return false; + } + if (!Arrays.equals(valueFields, that.valueFields)) { return false; } + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (keyField != null ? keyField.hashCode() : 0); + result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); + result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0); + return result; + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java new file mode 100644 index 0000000..1d48e1d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -0,0 +1,190 @@ +package parallelai.spyglass.hbase; + +import java.awt.event.KeyListener; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { + + private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); + + private byte [] m_tableName = null; + private byte [] m_startRow = null; + private byte [] m_endRow = null; + private String m_regionLocation = null; + private TreeSet<String> m_keyList = null; + private SourceMode m_sourceMode = SourceMode.EMPTY; + private boolean m_endRowInclusive = true; + + /** default constructor */ + public HBaseTableSplit() { + this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); + } + + /** + * Constructor + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, + final String location, final SourceMode sourceMode) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + this.m_sourceMode = sourceMode; + } + + public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { + this.m_tableName = tableName; + this.m_keyList = keyList; + this.m_sourceMode = sourceMode; + this.m_regionLocation = location; + } + + /** @return table name */ + public byte [] getTableName() { + return this.m_tableName; + } + + /** @return starting row key */ + public byte [] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte [] getEndRow() { + return this.m_endRow; + } + + public boolean getEndRowInclusive() { + return m_endRowInclusive; + } + + public void setEndRowInclusive(boolean isInclusive) { + m_endRowInclusive = isInclusive; + } + + /** @return list of keys to get */ + public TreeSet<String> getKeyList() { + return m_keyList; + } + + /** @return get the source mode */ + public SourceMode getSourceMode() { + return m_sourceMode; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + LOG.info("REGION GETTER : " + m_regionLocation); + + return this.m_regionLocation; + } + + public String[] getLocations() { + LOG.info("REGION ARRAY : " + m_regionLocation); + + return new String[] {this.m_regionLocation}; + } + + @Override + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting splits + return 0; + } + + @Override + public void readFields(DataInput in) throws IOException { + LOG.info("READ ME : " + in.toString()); + + this.m_tableName = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); + + switch(this.m_sourceMode) { + case SCAN_RANGE: + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); + break; + + case GET_LIST: + this.m_keyList = new TreeSet<String>(); + + int m = Bytes.toInt(Bytes.readByteArray(in)); + + for( int i = 0; i < m; i++) { + this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); + } + break; + } + + LOG.info("READ and CREATED : " + this); + } + + @Override + public void write(DataOutput out) throws IOException { + LOG.info("WRITE : " + this); + + Bytes.writeByteArray(out, this.m_tableName); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); + + switch( this.m_sourceMode ) { + case SCAN_RANGE: + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); + break; + + case GET_LIST: + Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); + + for( String k: this.m_keyList ) { + Bytes.writeByteArray(out, Bytes.toBytes(k)); + } + break; + } + + LOG.info("WROTE : " + out.toString()); + } + + @Override + public String toString() { + return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)", + Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); + } + + @Override + public int compareTo(HBaseTableSplit o) { + switch(m_sourceMode) { + case SCAN_ALL: + case SCAN_RANGE: + return Bytes.compareTo(getStartRow(), o.getStartRow()); + + case GET_LIST: + return m_keyList.equals( o.getKeyList() ) ? 0 : -1; + + default: + return -1; + } + + } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java new file mode 100644 index 0000000..9a0ed0e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Map.Entry; +import java.util.UUID; + +/** + * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with + * the {@HBaseFullScheme} to allow for the reading and writing + * of data to and from a HBase cluster. + */ +public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class); + + private final String id = UUID.randomUUID().toString(); + + /** Field SCHEME */ + public static final String SCHEME = "hbase"; + + /** Field hBaseAdmin */ + private transient HBaseAdmin hBaseAdmin; + + /** Field hostName */ + private String quorumNames; + /** Field tableName */ + private String tableName; + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Method getTableName returns the tableName of this HBaseTap object. + * + * @return the tableName (type String) of this HBaseTap object. + */ + public String getTableName() { + return tableName; + } + + public Path getPath() { + return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); + } + + protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { + if (hBaseAdmin == null) { + Configuration hbaseConf = HBaseConfiguration.create(conf); + hBaseAdmin = new HBaseAdmin(hbaseConf); + } + + return hBaseAdmin; + } + + @Override + public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { + if(quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sinking to table: {}", tableName); + + if (isReplace() && conf.get("mapred.task.partition") == null) { + try { + deleteResource(conf); + + } catch (IOException e) { + throw new RuntimeException("could not delete resource: " + e); + } + } + + else if (isUpdate()) { + try { + createResource(conf); + } catch (IOException e) { + throw new RuntimeException(tableName + " does not exist !", e); + } + + } + + conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); + super.sinkConfInit(process, conf); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException { + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + } + + @Override + public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException { + HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); + hBaseCollector.prepare(); + return hBaseCollector; + } + + @Override + public boolean createResource(JobConf jobConf) throws IOException { + HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } + + LOG.info("creating hbase table: {}", tableName); + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + + String[] familyNames = ((HBaseScheme) getScheme()).getFamilyNames(); + + for (String familyName : familyNames) { + tableDescriptor.addFamily(new HColumnDescriptor(familyName)); + } + + hBaseAdmin.createTable(tableDescriptor); + + return true; + } + + @Override + public boolean deleteResource(JobConf jobConf) throws IOException { + // TODO: for now we don't do anything just to be safe + return true; + } + + @Override + public boolean resourceExists(JobConf jobConf) throws IOException { + return getHBaseAdmin(jobConf).tableExists(tableName); + } + + @Override + public long getModifiedTime(JobConf jobConf) throws IOException { + return System.currentTimeMillis(); // currently unable to find last mod time + // on a table + } + + @Override + public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { + // a hack for MultiInputFormat to see that there is a child format + FileInputFormat.setInputPaths( conf, getPath() ); + + if(quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sourcing from table: {}", tableName); + + // TODO: Make this a bit smarter to store table name per flow. +// process.getID(); +// +// super.getFullIdentifier(conf); + + HBaseInputFormat.setTableName(conf, tableName); + + for( SourceConfig sc : sourceConfigList) { + sc.configure(conf); + } + + super.sourceConfInit(process, conf); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + + HBaseTap hBaseTap = (HBaseTap) object; + + if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (tableName != null ? tableName.hashCode() : 0); + return result; + } + + private static class SourceConfig implements Serializable { + public String tableName = null; + public SourceMode sourceMode = SourceMode.SCAN_ALL; + public String startKey = null; + public String stopKey = null; + public String [] keyList = null; + + public void configure(Configuration jobConf) { + switch( sourceMode ) { + case SCAN_RANGE: + jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); + + if( startKey != null && startKey.length() > 0 ) + jobConf.set( String.format(HBaseConstants.START_KEY, tableName), startKey); + + if( stopKey != null && stopKey.length() > 0 ) + jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); + + LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); + LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); + break; + + case GET_LIST: + jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); + jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); + + LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); + break; + + default: + jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); + + LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + break; + } + } + } + + private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>(); + + public void setHBaseRangeParms(String startKey, String stopKey ) { + SourceConfig sc = new SourceConfig(); + + sc.sourceMode = SourceMode.SCAN_RANGE; + sc.tableName = tableName; + sc.startKey = startKey; + sc.stopKey = stopKey; + + sourceConfigList.add(sc); + } + + public void setHBaseListParms(String [] keyList ) { + SourceConfig sc = new SourceConfig(); + + sc.sourceMode = SourceMode.GET_LIST; + sc.tableName = tableName; + sc.keyList = keyList; + + sourceConfigList.add(sc); + } + + public void setHBaseScanAllParms() { + SourceConfig sc = new SourceConfig(); + + sc.sourceMode = SourceMode.SCAN_ALL; + sc.tableName = tableName; + + sourceConfigList.add(sc); + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java new file mode 100644 index 0000000..098f957 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class HBaseTapCollector is a kind of + * {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the + * resource managed by a particular {@link HBaseTap} instance. + */ +public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector { + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class); + /** Field conf */ + private final JobConf conf; + /** Field writer */ + private RecordWriter writer; + /** Field flowProcess */ + private final FlowProcess<JobConf> hadoopFlowProcess; + /** Field tap */ + private final Tap<JobConf, RecordReader, OutputCollector> tap; + /** Field reporter */ + private final Reporter reporter = Reporter.NULL; + + /** + * Constructor TapCollector creates a new TapCollector instance. + * + * @param flowProcess + * @param tap + * of type Tap + * @throws IOException + * when fails to initialize + */ + public HBaseTapCollector(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException { + super(flowProcess, tap.getScheme()); + this.hadoopFlowProcess = flowProcess; + this.tap = tap; + this.conf = new JobConf(flowProcess.getConfigCopy()); + this.setOutput(this); + } + + @Override + public void prepare() { + try { + initialize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + super.prepare(); + } + + private void initialize() throws IOException { + tap.sinkConfInit(hadoopFlowProcess, conf); + OutputFormat outputFormat = conf.getOutputFormat(); + LOG.info("Output format class is: " + outputFormat.getClass().toString()); + writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL); + sinkCall.setOutput(this); + } + + @Override + public void close() { + try { + LOG.info("closing tap collector for: {}", tap); + writer.close(reporter); + } catch (IOException exception) { + LOG.warn("exception closing: {}", exception); + throw new TapException("exception closing HBaseTapCollector", exception); + } finally { + super.close(); + } + } + + /** + * Method collect writes the given values to the {@link Tap} this instance + * encapsulates. + * + * @param writableComparable + * of type WritableComparable + * @param writable + * of type Writable + * @throws IOException + * when + */ + public void collect(Object writableComparable, Object writable) throws IOException { + if (hadoopFlowProcess instanceof HadoopFlowProcess) + ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + + writer.write(writableComparable, writable); + } +} |