diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase')
7 files changed, 1878 insertions, 0 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java new file mode 100644 index 0000000..b546107 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -0,0 +1,19 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.conf.Configuration; + +public class HBaseConstants { +   +  public enum SourceMode { +    EMPTY, +    SCAN_ALL, +    SCAN_RANGE, +    GET_LIST; +  } + +  public static final String START_KEY = "hbase.%s.startkey"; +  public static final String STOP_KEY = "hbase.%s.stopkey";  +  public static final String SOURCE_MODE = "hbase.%s.source.mode"; +  public static final String KEY_LIST = "hbase.%s.key.list"; + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java new file mode 100644 index 0000000..f1f4fb7 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -0,0 +1,531 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseInputFormat +  implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { +   +  private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); + +  private final String id = UUID.randomUUID().toString(); + +  private byte [][] inputColumns; +  private HTable table; +  private HBaseRecordReader tableRecordReader; +  private Filter rowFilter; +  private String tableName = ""; + +  private HashMap<InetAddress, String> reverseDNSCacheMap = +    new HashMap<InetAddress, String>(); +   +  private String nameServer = null; +   +//  private Scan scan = null; + +   +  @SuppressWarnings("deprecation") +  @Override +  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { +    if (this.table == null) { +      throw new IOException("No table was provided"); +    } +     +    if (this.inputColumns == null || this.inputColumns.length == 0) { +      throw new IOException("Expecting at least one column"); +    } +     +    Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + +    if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { +      HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + +      if (null == regLoc) { +        throw new IOException("Expecting at least one region."); +      } + +      List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); +      HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc +              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); +      splits.add(split); +       +      return splits.toArray(new HBaseTableSplit[splits.size()]); +    } +     +    if( keys.getSecond() == null || keys.getSecond().length == 0) { +      throw new IOException("Expecting at least one region."); +    } +     +    if( keys.getFirst().length != keys.getSecond().length ) { +      throw new IOException("Regions for start and end key do not match"); +    } +     +    byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; +    byte[] maxKey = keys.getSecond()[0]; +     +    LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + +    byte [][] regStartKeys = keys.getFirst(); +    byte [][] regStopKeys = keys.getSecond(); +    String [] regions = new String[regStartKeys.length]; +     +    for( int i = 0; i < regStartKeys.length; i++ ) { +      minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey; +      maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey; +       +      HServerAddress regionServerAddress =  +          table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +        InetAddress regionAddress = +          regionServerAddress.getInetSocketAddress().getAddress(); +        String regionLocation; +        try { +          regionLocation = reverseDNS(regionAddress); +        } catch (NamingException e) { +          LOG.error("Cannot resolve the host name for " + regionAddress + +              " because of " + e); +          regionLocation = regionServerAddress.getHostname(); +        } + +//       HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +//      InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); +// +//      String regionLocation; +// +//      try { +//        regionLocation = reverseDNS(regionAddress); +//      } catch (NamingException e) { +//        LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); +//        regionLocation = regionServerAddress.getHostname(); +//      } + +//      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname(); +       +      LOG.debug( "***** " + regionLocation ); +       +      if( regionLocation == null || regionLocation.length() == 0 ) +        throw new IOException( "The region info for regiosn " + i + " is null or empty"); + +      regions[i] = regionLocation; +       +      LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); +    } +     +    byte[] startRow = HConstants.EMPTY_START_ROW; +    byte[] stopRow = HConstants.EMPTY_END_ROW; +     +    LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +     +    LOG.info("SOURCE MODE is : " + sourceMode);     + +    switch( sourceMode ) { +    case SCAN_ALL: +      startRow = HConstants.EMPTY_START_ROW; +      stopRow = HConstants.EMPTY_END_ROW; + +      LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      break; +       +    case SCAN_RANGE: +      startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; +      stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; + +      LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      break; +    } +     +    switch( sourceMode ) { +      case EMPTY: +      case SCAN_ALL: +      case SCAN_RANGE: +      { +//        startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow; +//        stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow; +         +        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +         +        List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); +         +        int maxRegions = validRegions.size(); +        int currentRegion = 1; +         +        for( HRegionLocation cRegion : validRegions ) { +          byte [] rStart = cRegion.getRegionInfo().getStartKey(); +          byte [] rStop = cRegion.getRegionInfo().getEndKey(); +           +          HServerAddress regionServerAddress = cRegion.getServerAddress(); +            InetAddress regionAddress = +              regionServerAddress.getInetSocketAddress().getAddress(); +            String regionLocation; +            try { +              regionLocation = reverseDNS(regionAddress); +            } catch (NamingException e) { +              LOG.error("Cannot resolve the host name for " + regionAddress + +                  " because of " + e); +              regionLocation = regionServerAddress.getHostname(); +            } +             +            byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); +            byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  +             +            LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",  +                (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), +                    (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), +                    rStop.length +                    )); +           +          HBaseTableSplit split = new HBaseTableSplit( +              table.getTableName(), +              sStart, +              sStop, +              regionLocation, +              SourceMode.SCAN_RANGE +           ); +           +          split.setEndRowInclusive( currentRegion == maxRegions ); + +          currentRegion ++; +                   +           LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  +               Bytes.toString(startRow), Bytes.toString(stopRow), +               Bytes.toString(rStart), Bytes.toString(rStop), +               Bytes.toString(sStart),  +               Bytes.toString(sStop),  +               cRegion.getHostnamePort(), split) ); +           +           splits.add(split); +        } +         +// +//        for (int i = 0; i < keys.getFirst().length; i++) { +// +//          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +//            LOG.info("NOT including regions : " + regions[i]); +//            continue; +//          } +//           +//          // determine if the given start an stop key fall into the region +//          if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || +//               Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && +//              (stopRow.length == 0 || +//               Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { +//             +//            byte[] splitStart = startRow.length == 0 || +//              Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? +//                keys.getFirst()[i] : startRow; +//            byte[] splitStop = (stopRow.length == 0 || +//              Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && +//              keys.getSecond()[i].length > 0 ? +//                keys.getSecond()[i] : stopRow; +//                HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +//              splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); +//            splits.add(split); +//             +//            LOG.info("getSplits: split -> " + i + " -> " + split); +//          } +//        } +         +        LOG.info("RETURNED SPLITS: split -> " + splits); + +        return splits.toArray(new HBaseTableSplit[splits.size()]); +      }  +         +      case GET_LIST: +      { +        if( keyList == null || keyList.size() == 0 ) { +          throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); +        }  +         +        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +         +        for (int i = 0; i < keys.getFirst().length; i++) { + +          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +            continue; +          } +           +          LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); + +          Set<String> regionsSubSet = null; +           +          if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { +            LOG.info("REGION start is empty"); +            LOG.info("REGION stop is empty"); +            regionsSubSet = keyList; +          } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { +            LOG.info("REGION start is empty"); +            regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); +          } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { +            LOG.info("REGION stop is empty"); +            regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); +          } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { +            regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); +          } else { +            throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",  +                regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); +          } +           +          if( regionsSubSet == null || regionsSubSet.size() == 0) { +            LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); +             +            continue; +          } + +          TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); + +          LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); +             +          HBaseTableSplit split = new HBaseTableSplit( +              table.getTableName(), regionKeyList, +              regions[i],  +              SourceMode.GET_LIST); +          splits.add(split); +        } +           +        return splits.toArray(new HBaseTableSplit[splits.size()]); +      }  + +      default: +        throw new IOException("Unknown source Mode : " + sourceMode ); +    } +  } +   +  private String reverseDNS(InetAddress ipAddress) throws NamingException { +    String hostName = this.reverseDNSCacheMap.get(ipAddress); +    if (hostName == null) { +      hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); +      this.reverseDNSCacheMap.put(ipAddress, hostName); +    } +    return hostName; +  } + + +  @Override +  public RecordReader<ImmutableBytesWritable, Result> getRecordReader( +      InputSplit split, JobConf job, Reporter reporter) throws IOException { +     +    if( ! (split instanceof HBaseTableSplit ) ) +      throw new IOException("Table Split is not type HBaseTableSplit"); + +    HBaseTableSplit tSplit = (HBaseTableSplit) split; +     +    HBaseRecordReader trr = new HBaseRecordReader(); + +    switch( tSplit.getSourceMode() ) { +      case SCAN_ALL: +      case SCAN_RANGE: +      { +        LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); +         +        trr.setStartRow(tSplit.getStartRow()); +        trr.setEndRow(tSplit.getEndRow()); +        trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +      } +       +      break; +       +      case GET_LIST: +      { +        LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); +         +        trr.setKeyList(tSplit.getKeyList()); +      } +       +      break; +       +      default: +        throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() ); +    } +     +    trr.setSourceMode(tSplit.getSourceMode()); +    trr.setHTable(this.table); +    trr.setInputColumns(this.inputColumns); +    trr.setRowFilter(this.rowFilter); + +    trr.init(); + +    return trr; +  } +   +   +   +  /* Configuration Section */ + +  /** +   * space delimited list of columns +   */ +  public static final String COLUMN_LIST = "hbase.tablecolumns"; + +  /** +   * Use this jobconf param to specify the input table +   */ +  private static final String INPUT_TABLE = "hbase.inputtable"; + +  private String startKey = null; +  private String stopKey = null; +   +  private SourceMode sourceMode = SourceMode.EMPTY; +  private TreeSet<String> keyList = null; +   +  public void configure(JobConf job) { +    String tableName = getTableName(job); +    String colArg = job.get(COLUMN_LIST); +    String[] colNames = colArg.split(" "); +    byte [][] m_cols = new byte[colNames.length][]; +    for (int i = 0; i < m_cols.length; i++) { +      m_cols[i] = Bytes.toBytes(colNames[i]); +    } +    setInputColumns(m_cols); +     +    try { +      setHTable(new HTable(HBaseConfiguration.create(job), tableName)); +    } catch (Exception e) { +      LOG.error( "************* Table could not be created" ); +      LOG.error(StringUtils.stringifyException(e)); +    } +     +    LOG.debug("Entered : " + this.getClass() + " : configure()" ); +     +    sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; +     +    LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",  +        String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); + +    switch( sourceMode ) { +      case SCAN_RANGE: +        LOG.info("HIT SCAN_RANGE"); +         +        startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) ); +        stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) ); + +        LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) ); +        break; +         +      case GET_LIST: +        LOG.info("HIT GET_LIST"); +         +        Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); +        keyList = new TreeSet<String> (keys); +         +        LOG.info( "GOT KEY LIST : " + keys ); +        LOG.info(String.format("SETTING key list (%s)", keyList) ); + +        break; +         +      case EMPTY: +        LOG.info("HIT EMPTY"); +         +        sourceMode = sourceMode.SCAN_ALL; +        break; +       +      default: +        LOG.info("HIT DEFAULT"); +         +        break; +    } +  } + +  public void validateInput(JobConf job) throws IOException { +    // expecting exactly one path +    String tableName = getTableName(job); +     +    if (tableName == null) { +      throw new IOException("expecting one table name"); +    } +    LOG.debug("".format("Found Table name [%s]", tableName)); +     + +    // connected to table? +    if (getHTable() == null) { +      throw new IOException("could not connect to table '" + +        tableName + "'"); +    } +    LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); + +    // expecting at least one column +    String colArg = job.get(COLUMN_LIST); +    if (colArg == null || colArg.length() == 0) { +      throw new IOException("expecting at least one column"); +    } +    LOG.debug("".format("Found Columns [%s]", colArg)); + +    LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); +     +    if( sourceMode == SourceMode.EMPTY ) { +      throw new IOException("SourceMode should not be EMPTY"); +    } +     +    if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) { +      throw new IOException( "Source mode is GET_LIST bu key list is empty"); +    } +  } +   +   +  /* Getters & Setters */ +  private HTable getHTable() { return this.table; } +  private void setHTable(HTable ht) { this.table = ht; } +  private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; } + +   +  private void setJobProp( JobConf job, String key, String value) { +    if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); +    job.set(key,  value); +  } +   +  private String getJobProp( JobConf job, String key ) { return job.get(key); } +   +  public static void setTableName(JobConf job, String tableName) { +    // Make sure that table has not been set before +    String oldTableName = getTableName(job); +    if(oldTableName != null) { +      throw new RuntimeException("table name already set to: '" +        + oldTableName + "'"); +    } +     +    job.set(INPUT_TABLE, tableName); +  } +   +  public static String getTableName(JobConf job) { +    return job.get(INPUT_TABLE); +  } + +  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { +    return true; +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java new file mode 100644 index 0000000..97077c4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -0,0 +1,325 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { + +  static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); + +  private byte [] startRow; +  private byte [] endRow; +  private byte [] lastSuccessfulRow; +  private TreeSet<String> keyList; +  private SourceMode sourceMode; +  private Filter trrRowFilter; +  private ResultScanner scanner; +  private HTable htable; +  private byte [][] trrInputColumns; +  private long timestamp; +  private int rowcount; +  private boolean logScannerActivity = false; +  private int logPerRowCount = 100; +  private boolean endRowInclusive = true; + +  /** +   * Restart from survivable exceptions by creating a new scanner. +   * +   * @param firstRow +   * @throws IOException +   */ +  public void restartRangeScan(byte[] firstRow) throws IOException { +    Scan currentScan; +    if ((endRow != null) && (endRow.length > 0)) { +      if (trrRowFilter != null) { +        Scan scan = new Scan(firstRow, (endRowInclusive ?   +            Bytes.add(endRow, new byte[] {0}) : endRow ) ); +         +        TableInputFormat.addColumns(scan, trrInputColumns); +        scan.setFilter(trrRowFilter); +        scan.setCacheBlocks(false); +        this.scanner = this.htable.getScanner(scan); +        currentScan = scan; +      } else { +        LOG.debug("TIFB.restart, firstRow: " + +            Bytes.toString(firstRow) + ", endRow: " + +            Bytes.toString(endRow)); +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); +        TableInputFormat.addColumns(scan, trrInputColumns); +        this.scanner = this.htable.getScanner(scan); +        currentScan = scan; +      } +    } else { +      LOG.debug("TIFB.restart, firstRow: " + +          Bytes.toStringBinary(firstRow) + ", no endRow"); + +      Scan scan = new Scan(firstRow); +      TableInputFormat.addColumns(scan, trrInputColumns); +      scan.setFilter(trrRowFilter); +      this.scanner = this.htable.getScanner(scan); +      currentScan = scan; +    } +    if (logScannerActivity) { +      LOG.info("Current scan=" + currentScan.toString()); +      timestamp = System.currentTimeMillis(); +      rowcount = 0; +    } +  } + +  public TreeSet<String> getKeyList() { +    return keyList; +  } + +  public void setKeyList(TreeSet<String> keyList) { +    this.keyList = keyList; +  } + +  public SourceMode getSourceMode() { +    return sourceMode; +  } + +  public void setSourceMode(SourceMode sourceMode) { +    this.sourceMode = sourceMode; +  } + +  public byte[] getEndRow() { +    return endRow; +  } +   +  public void setEndRowInclusive(boolean isInclusive) { +    endRowInclusive = isInclusive; +  } +   +  public boolean getEndRowInclusive() { +    return endRowInclusive; +  } +   +  private byte [] nextKey = null; + +  /** +   * Build the scanner. Not done in constructor to allow for extension. +   * +   * @throws IOException +   */ +  public void init() throws IOException { +    switch( sourceMode ) { +      case SCAN_ALL: +      case SCAN_RANGE: +        restartRangeScan(startRow); +        break; +         +      case GET_LIST: +        nextKey = Bytes.toBytes(keyList.pollFirst()); +        break; +         +      default: +        throw new IOException(" Unknown source mode : " + sourceMode ); +    } +  } + +  byte[] getStartRow() { +    return this.startRow; +  } +  /** +   * @param htable the {@link HTable} to scan. +   */ +  public void setHTable(HTable htable) { +    Configuration conf = htable.getConfiguration(); +    logScannerActivity = conf.getBoolean( +      ScannerCallable.LOG_SCANNER_ACTIVITY, false); +    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); +    this.htable = htable; +  } + +  /** +   * @param inputColumns the columns to be placed in {@link Result}. +   */ +  public void setInputColumns(final byte [][] inputColumns) { +    this.trrInputColumns = inputColumns; +  } + +  /** +   * @param startRow the first row in the split +   */ +  public void setStartRow(final byte [] startRow) { +    this.startRow = startRow; +  } + +  /** +   * +   * @param endRow the last row in the split +   */ +  public void setEndRow(final byte [] endRow) { +    this.endRow = endRow; +  } + +  /** +   * @param rowFilter the {@link Filter} to be used. +   */ +  public void setRowFilter(Filter rowFilter) { +    this.trrRowFilter = rowFilter; +  } + +  @Override +  public void close() { +	  if (this.scanner != null) this.scanner.close(); +  } + +  /** +   * @return ImmutableBytesWritable +   * +   * @see org.apache.hadoop.mapred.RecordReader#createKey() +   */ +  @Override +  public ImmutableBytesWritable createKey() { +    return new ImmutableBytesWritable(); +  } + +  /** +   * @return RowResult +   * +   * @see org.apache.hadoop.mapred.RecordReader#createValue() +   */ +  @Override +  public Result createValue() { +    return new Result(); +  } + +  @Override +  public long getPos() { +    // This should be the ordinal tuple in the range; +    // not clear how to calculate... +    return 0; +  } + +  @Override +  public float getProgress() { +    // Depends on the total number of tuples and getPos +    return 0; +  } + +  /** +   * @param key HStoreKey as input key. +   * @param value MapWritable as input value +   * @return true if there was more data +   * @throws IOException +   */ +  @Override +  public boolean next(ImmutableBytesWritable key, Result value) +  throws IOException { +     +    switch(sourceMode) { +      case SCAN_ALL: +      case SCAN_RANGE: +      { +         +        Result result; +        try { +          try { +            result = this.scanner.next(); +            if (logScannerActivity) { +              rowcount ++; +              if (rowcount >= logPerRowCount) { +                long now = System.currentTimeMillis(); +                LOG.info("Mapper took " + (now-timestamp) +                  + "ms to process " + rowcount + " rows"); +                timestamp = now; +                rowcount = 0; +              } +            } +          } catch (IOException e) { +            // try to handle all IOExceptions by restarting +            // the scanner, if the second call fails, it will be rethrown +            LOG.debug("recovered from " + StringUtils.stringifyException(e)); +            if (lastSuccessfulRow == null) { +              LOG.warn("We are restarting the first next() invocation," + +                  " if your mapper has restarted a few other times like this" + +                  " then you should consider killing this job and investigate" + +                  " why it's taking so long."); +            } +            if (lastSuccessfulRow == null) { +              restartRangeScan(startRow); +            } else { +              restartRangeScan(lastSuccessfulRow); +              this.scanner.next();    // skip presumed already mapped row +            } +            result = this.scanner.next(); +          } + +          if (result != null && result.size() > 0) { +            key.set(result.getRow()); +            lastSuccessfulRow = key.get(); +            Writables.copyWritable(result, value); +            return true; +          } +          return false; +        } catch (IOException ioe) { +          if (logScannerActivity) { +            long now = System.currentTimeMillis(); +            LOG.info("Mapper took " + (now-timestamp) +              + "ms to process " + rowcount + " rows"); +            LOG.info(ioe); +            String lastRow = lastSuccessfulRow == null ? +              "null" : Bytes.toStringBinary(lastSuccessfulRow); +            LOG.info("lastSuccessfulRow=" + lastRow); +          } +          throw ioe; +        } +      } +       +      case GET_LIST: +      { +        Result result; +        if( nextKey != null ) { +          result = this.htable.get(new Get(nextKey)); +           +          if (result != null && result.size() > 0) { +        	System.out.println("KeyList => " + keyList); +        	System.out.println("Result => " + result); +        	if (keyList != null || !keyList.isEmpty()) { +        		 +        		String newKey = keyList.pollFirst(); +        		System.out.println("New Key => " + newKey); +        		nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); +        	} else { +        		nextKey = null; +        	} +            key.set(result.getRow()); +            lastSuccessfulRow = key.get(); +            Writables.copyWritable(result, value); +            return true; +          } +          return false; +        } else { +          return false; +        } +      } +       +      default: +        throw new IOException("Unknown source mode : " + sourceMode ); +    }  +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java new file mode 100644 index 0000000..e5acc30 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -0,0 +1,336 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +//import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +/** + * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to + * allow for the reading and writing of data to and from a HBase cluster. + * + * @see HBaseTap + */ +public class HBaseScheme +//    extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +      extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +  /** Field LOG */ +  private static final Logger LOG = LoggerFactory.getLogger(HBaseScheme.class); + +  /** Field keyFields */ +  private Fields keyField; +   +  /** Long timestamp */ +  private long timeStamp; +  +  /** String familyNames */ +  private String[] familyNames; +  /** Field valueFields */ +  private Fields[] valueFields; + +  /** String columns */ +  private transient String[] columns; +  /** Field fields */ +  private transient byte[][] fields; +  + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance. +   * +   * @param keyFields   of type Fields +   * @param familyName  of type String +   * @param valueFields of type Fields +   */ +  public HBaseScheme(Fields keyFields, String familyName, Fields valueFields) { +    this(keyFields, new String[]{familyName}, Fields.fields(valueFields)); +  } + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance. +   * +   * @param keyFields   of type Fields +   * @param familyNames of type String[] +   * @param valueFields of type Fields[] +   */ +  public HBaseScheme(Fields keyFields, String[] familyNames, Fields[] valueFields) { +    this.keyField = keyFields; +    this.familyNames = familyNames; +    this.valueFields = valueFields; +    this.timeStamp = System.currentTimeMillis(); +     +    setSourceSink(this.keyField, this.valueFields); + +    validate(); +  } + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance. +   * +   * @param keyFields   of type Fields +   * @param timeStamp   of type Long +   * @param familyNames of type String[] +   * @param valueFields of type Fields[] +   */ +  public HBaseScheme(Fields keyFields, long timeStamp, String[] familyNames, Fields[] valueFields) { +    this.keyField = keyFields; +    this.timeStamp = timeStamp; +    this.familyNames = familyNames; +    this.valueFields = valueFields; + +    setSourceSink(this.keyField, this.valueFields); + +    validate(); +  }   +     +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names +   * +   * @param keyField    of type String +   * @param valueFields of type Fields +   */ +  public HBaseScheme(Fields keyField, Fields valueFields) { +    this(keyField, Fields.fields(valueFields)); +  } + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names +   * +   * @param keyField    of type Field +   * @param valueFields of type Field[] +   */ +  public HBaseScheme(Fields keyField, Fields[] valueFields) { +    this.keyField = keyField; +    this.valueFields = valueFields; +    this.timeStamp = System.currentTimeMillis(); +     +    validate(); + +    setSourceSink(this.keyField, this.valueFields); +  } + +  private void validate() { +    if (keyField.size() != 1) { +      throw new IllegalArgumentException("may only have one key field, found: " + keyField.print()); +    } +  } + +  private void setSourceSink(Fields keyFields, Fields[] columnFields) { +    Fields allFields = keyFields; + +    if (columnFields.length != 0) { +      allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend +    } + +    setSourceFields(allFields); +    setSinkFields(allFields); +  } + +  /** +   * Method getFamilyNames returns the set of familyNames of this HBaseScheme object. +   * +   * @return the familyNames (type String[]) of this HBaseScheme object. +   */ +  public String[] getFamilyNames() { +    HashSet<String> familyNameSet = new HashSet<String>(); + +    if (familyNames == null) { +      for (String columnName : columns(null, this.valueFields)) { +        int pos = columnName.indexOf(":"); +        familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName)); +      } +    } else { +      for (String familyName : familyNames) { +        familyNameSet.add(familyName); +      } +    } +    return familyNameSet.toArray(new String[0]); +  } + +  @Override +  public void sourcePrepare(FlowProcess<JobConf> flowProcess, +      SourceCall<Object[], RecordReader> sourceCall) { +    Object[] pair = +        new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + +    sourceCall.setContext(pair); +  } + +  @Override +  public void sourceCleanup(FlowProcess<JobConf> flowProcess, +      SourceCall<Object[], RecordReader> sourceCall) { +    sourceCall.setContext(null); +  } + +  @Override +  public boolean source(FlowProcess<JobConf> flowProcess, +      SourceCall<Object[], RecordReader> sourceCall) throws IOException { +    Tuple result = new Tuple(); + +    Object key = sourceCall.getContext()[0]; +    Object value = sourceCall.getContext()[1]; +    boolean hasNext = sourceCall.getInput().next(key, value); +    if (!hasNext) { return false; } + +    // Skip nulls +    if (key == null || value == null) { return true; } + +    ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +    Result row = (Result) value; +    result.add(keyWritable); + +    for (int i = 0; i < this.familyNames.length; i++) { +      String familyName = this.familyNames[i]; +      byte[] familyNameBytes = Bytes.toBytes(familyName); +      Fields fields = this.valueFields[i]; +      for (int k = 0; k < fields.size(); k++) { +        String fieldName = (String) fields.get(k); +        byte[] fieldNameBytes = Bytes.toBytes(fieldName); +        byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes); +        if (cellValue == null) { +            cellValue = new byte[0]; +        } +        result.add(new ImmutableBytesWritable(cellValue)); +      } +    } + +    sourceCall.getIncomingEntry().setTuple(result); + +    return true; +  } + +  @Override +  public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) +      throws IOException { +    TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +    OutputCollector outputCollector = sinkCall.getOutput(); +    Tuple key = tupleEntry.selectTuple(keyField); +    ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); +    Put put = new Put(keyBytes.get(), this.timeStamp); + +    for (int i = 0; i < valueFields.length; i++) { +      Fields fieldSelector = valueFields[i]; +      TupleEntry values = tupleEntry.selectEntry(fieldSelector); + +      for (int j = 0; j < values.getFields().size(); j++) { +        Fields fields = values.getFields(); +        Tuple tuple = values.getTuple(); + +        ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j); +        put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get()); +      } +    } + +    outputCollector.collect(null, put); +  } + +  @Override +  public void sinkConfInit(FlowProcess<JobConf> process, +      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +    conf.setOutputFormat(TableOutputFormat.class); + +    conf.setOutputKeyClass(ImmutableBytesWritable.class); +    conf.setOutputValueClass(Put.class); +  } + +  @Override +  public void sourceConfInit(FlowProcess<JobConf> process, +      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +    conf.setInputFormat(HBaseInputFormat.class); + +    String columns = getColumns(); +    LOG.debug("sourcing from columns: {}", columns); +    conf.set(HBaseInputFormat.COLUMN_LIST, columns); +  } + +  private String getColumns() { +    return Util.join(columns(this.familyNames, this.valueFields), " "); +  } + +  private String[] columns(String[] familyNames, Fields[] fieldsArray) { +    if (columns != null) { return columns; } + +    int size = 0; + +    for (Fields fields : fieldsArray) { size += fields.size(); } + +    columns = new String[size]; + +    int count = 0; + +    for (int i = 0; i < fieldsArray.length; i++) { +      Fields fields = fieldsArray[i]; + +      for (int j = 0; j < fields.size(); j++) { +        if (familyNames == null) { columns[count++] = hbaseColumn((String) fields.get(j)); } else { +          columns[count++] = hbaseColumn(familyNames[i]) + (String) fields.get(j); +        } +      } +    } + +    return columns; +  } + +  private String hbaseColumn(String column) { +    if (column.indexOf(":") < 0) { return column + ":"; } + +    return column; +  } + +  @Override +  public boolean equals(Object object) { +    if (this == object) { return true; } +    if (object == null || getClass() != object.getClass()) { return false; } +    if (!super.equals(object)) { return false; } + +    HBaseScheme that = (HBaseScheme) object; + +    if (!Arrays.equals(familyNames, that.familyNames)) { return false; } +    if (keyField != null ? !keyField.equals(that.keyField) : that.keyField != null) { +      return false; +    } +    if (!Arrays.equals(valueFields, that.valueFields)) { return false; } + +    return true; +  } + +  @Override +  public int hashCode() { +    int result = super.hashCode(); +    result = 31 * result + (keyField != null ? keyField.hashCode() : 0); +    result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +    result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0); +    return result; +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java new file mode 100644 index 0000000..1d48e1d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -0,0 +1,190 @@ +package parallelai.spyglass.hbase; + +import java.awt.event.KeyListener; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { + +  private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); +   +  private byte [] m_tableName = null; +  private byte [] m_startRow = null; +  private byte [] m_endRow = null; +  private String m_regionLocation = null; +  private TreeSet<String> m_keyList = null; +  private SourceMode m_sourceMode = SourceMode.EMPTY; +  private boolean m_endRowInclusive = true; + +  /** default constructor */ +  public HBaseTableSplit() { +    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, +      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); +  } + +  /** +   * Constructor +   * @param tableName +   * @param startRow +   * @param endRow +   * @param location +   */ +  public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, +      final String location, final SourceMode sourceMode) { +    this.m_tableName = tableName; +    this.m_startRow = startRow; +    this.m_endRow = endRow; +    this.m_regionLocation = location; +    this.m_sourceMode = sourceMode; +  } +   +  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { +    this.m_tableName = tableName; +    this.m_keyList = keyList; +    this.m_sourceMode = sourceMode; +    this.m_regionLocation = location; +  } + +  /** @return table name */ +  public byte [] getTableName() { +    return this.m_tableName; +  } + +  /** @return starting row key */ +  public byte [] getStartRow() { +    return this.m_startRow; +  } + +  /** @return end row key */ +  public byte [] getEndRow() { +    return this.m_endRow; +  } +   +  public boolean getEndRowInclusive() { +    return m_endRowInclusive; +  } +   +  public void setEndRowInclusive(boolean isInclusive) { +    m_endRowInclusive = isInclusive; +  } +   +  /** @return list of keys to get */ +  public TreeSet<String> getKeyList() { +    return m_keyList; +  } +   +  /** @return get the source mode */ +  public SourceMode getSourceMode() { +    return m_sourceMode; +  } + +  /** @return the region's hostname */ +  public String getRegionLocation() { +    LOG.info("REGION GETTER : " + m_regionLocation); +     +    return this.m_regionLocation; +  } + +  public String[] getLocations() { +    LOG.info("REGION ARRAY : " + m_regionLocation); + +    return new String[] {this.m_regionLocation}; +  } + +  @Override +  public long getLength() { +    // Not clear how to obtain this... seems to be used only for sorting splits +    return 0; +  } + +  @Override +  public void readFields(DataInput in) throws IOException { +    LOG.info("READ ME : " + in.toString()); + +    this.m_tableName = Bytes.readByteArray(in); +    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); +    this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); +     +    switch(this.m_sourceMode) { +      case SCAN_RANGE: +        this.m_startRow = Bytes.readByteArray(in); +        this.m_endRow = Bytes.readByteArray(in); +        this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); +        break; +         +      case GET_LIST: +        this.m_keyList = new TreeSet<String>(); +         +        int m = Bytes.toInt(Bytes.readByteArray(in)); +         +        for( int i = 0; i < m; i++) { +          this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); +        } +        break; +    } +     +    LOG.info("READ and CREATED : " + this); +  } + +  @Override +  public void write(DataOutput out) throws IOException { +    LOG.info("WRITE : " + this); + +    Bytes.writeByteArray(out, this.m_tableName); +    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); +    Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +     +    switch( this.m_sourceMode ) { +      case SCAN_RANGE: +        Bytes.writeByteArray(out, this.m_startRow); +        Bytes.writeByteArray(out, this.m_endRow); +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); +        break; +         +      case GET_LIST: +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); +         +        for( String k: this.m_keyList ) { +          Bytes.writeByteArray(out, Bytes.toBytes(k)); +        } +        break; +    } + +    LOG.info("WROTE : " + out.toString()); +  } + +  @Override +  public String toString() { +    return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)",  +        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); +  } + +  @Override +  public int compareTo(HBaseTableSplit o) { +    switch(m_sourceMode) { +      case SCAN_ALL: +      case SCAN_RANGE: +        return Bytes.compareTo(getStartRow(), o.getStartRow()); +         +      case GET_LIST: +        return m_keyList.equals( o.getKeyList() ) ? 0 : -1;  +         +      default: +        return -1; +    } +     +  } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java new file mode 100644 index 0000000..9a0ed0e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Map.Entry; +import java.util.UUID; + +/** + * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with + * the {@HBaseFullScheme} to allow for the reading and writing + * of data to and from a HBase cluster. + */ +public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { +  /** Field LOG */ +  private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class); + +  private final String id = UUID.randomUUID().toString(); + +  /** Field SCHEME */ +  public static final String SCHEME = "hbase"; + +  /** Field hBaseAdmin */ +  private transient HBaseAdmin hBaseAdmin; +  +  /** Field hostName */ +  private String quorumNames; +  /** Field tableName */ +  private String tableName; + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   */ +  public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) { +    super(HBaseFullScheme, SinkMode.UPDATE); +    this.tableName = tableName; +  } + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   * @param sinkMode +   *          of type SinkMode +   */ +  public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { +    super(HBaseFullScheme, sinkMode); +    this.tableName = tableName; +  } + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   */ +  public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme) { +    super(HBaseFullScheme, SinkMode.UPDATE); +    this.quorumNames = quorumNames; +    this.tableName = tableName; +  } + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   * @param sinkMode +   *          of type SinkMode +   */ +  public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { +    super(HBaseFullScheme, sinkMode); +    this.quorumNames = quorumNames; +    this.tableName = tableName; +  } + +  /** +   * Method getTableName returns the tableName of this HBaseTap object. +   * +   * @return the tableName (type String) of this HBaseTap object. +   */ +  public String getTableName() { +    return tableName; +  } + +  public Path getPath() { +    return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +  } + +  protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +    if (hBaseAdmin == null) { +      Configuration hbaseConf = HBaseConfiguration.create(conf); +      hBaseAdmin = new HBaseAdmin(hbaseConf); +    } + +    return hBaseAdmin; +  } + +  @Override +  public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +    if(quorumNames != null) { +      conf.set("hbase.zookeeper.quorum", quorumNames); +    } + +    LOG.debug("sinking to table: {}", tableName); + +    if (isReplace() && conf.get("mapred.task.partition") == null) { +      try { +        deleteResource(conf); + +      } catch (IOException e) { +        throw new RuntimeException("could not delete resource: " + e); +      } +    } + +    else if (isUpdate()) { +      try { +          createResource(conf); +      } catch (IOException e) { +          throw new RuntimeException(tableName + " does not exist !", e); +      } + +    } + +    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +    super.sinkConfInit(process, conf); +  } + +  @Override +  public String getIdentifier() { +    return id; +  } + +  @Override +  public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException { +    return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +  } + +  @Override +  public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException { +    HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); +    hBaseCollector.prepare(); +    return hBaseCollector; +  } + +  @Override +  public boolean createResource(JobConf jobConf) throws IOException { +    HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +    if (hBaseAdmin.tableExists(tableName)) { +      return true; +    } + +    LOG.info("creating hbase table: {}", tableName); + +    HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + +    String[] familyNames = ((HBaseScheme) getScheme()).getFamilyNames(); + +    for (String familyName : familyNames) { +      tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +    } + +    hBaseAdmin.createTable(tableDescriptor); + +    return true; +  } + +  @Override +  public boolean deleteResource(JobConf jobConf) throws IOException { +    // TODO: for now we don't do anything just to be safe +    return true; +  } + +  @Override +  public boolean resourceExists(JobConf jobConf) throws IOException { +    return getHBaseAdmin(jobConf).tableExists(tableName); +  } + +  @Override +  public long getModifiedTime(JobConf jobConf) throws IOException { +    return System.currentTimeMillis(); // currently unable to find last mod time +                                       // on a table +  } + +  @Override +  public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +    // a hack for MultiInputFormat to see that there is a child format +    FileInputFormat.setInputPaths( conf, getPath() ); + +    if(quorumNames != null) { +      conf.set("hbase.zookeeper.quorum", quorumNames); +    } + +    LOG.debug("sourcing from table: {}", tableName); + +    // TODO: Make this a bit smarter to store table name per flow. +//    process.getID(); +//     +//    super.getFullIdentifier(conf); +     +    HBaseInputFormat.setTableName(conf, tableName); +     +    for( SourceConfig sc : sourceConfigList) { +      sc.configure(conf); +    } +     +    super.sourceConfInit(process, conf); +  } + +  @Override +  public boolean equals(Object object) { +    if (this == object) { +      return true; +    } +    if (object == null || getClass() != object.getClass()) { +      return false; +    } +    if (!super.equals(object)) { +      return false; +    } + +    HBaseTap hBaseTap = (HBaseTap) object; + +    if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +      return false; +    } + +    return true; +  } + +  @Override +  public int hashCode() { +    int result = super.hashCode(); +    result = 31 * result + (tableName != null ? tableName.hashCode() : 0); +    return result; +  } +   +  private static class SourceConfig implements Serializable { +    public String tableName = null; +    public SourceMode sourceMode = SourceMode.SCAN_ALL; +    public String startKey = null; +    public String stopKey = null; +    public String [] keyList = null; +     +    public void configure(Configuration jobConf) { +      switch( sourceMode ) { +        case SCAN_RANGE: +          jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); +           +          if( startKey != null && startKey.length() > 0 ) +            jobConf.set( String.format(HBaseConstants.START_KEY, tableName), startKey); +           +          if( stopKey != null && stopKey.length() > 0 ) +            jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); +           +          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); +          LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); +          break; +           +        case GET_LIST: +          jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); +          jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); + +          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); +          break; +           +        default: +          jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); + +          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          break; +      } +    } +  } +   +  private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>(); + +  public void setHBaseRangeParms(String startKey, String stopKey ) { +    SourceConfig sc = new SourceConfig(); +     +    sc.sourceMode = SourceMode.SCAN_RANGE; +    sc.tableName = tableName; +    sc.startKey = startKey; +    sc.stopKey = stopKey; +     +    sourceConfigList.add(sc); +  } + +  public void setHBaseListParms(String [] keyList ) { +    SourceConfig sc = new SourceConfig(); +     +    sc.sourceMode = SourceMode.GET_LIST; +    sc.tableName = tableName; +    sc.keyList = keyList; +     +    sourceConfigList.add(sc); +  } +   +  public void setHBaseScanAllParms() { +    SourceConfig sc = new SourceConfig(); +     +    sc.sourceMode = SourceMode.SCAN_ALL; +    sc.tableName = tableName; + +    sourceConfigList.add(sc); +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java new file mode 100644 index 0000000..098f957 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class HBaseTapCollector is a kind of + * {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the + * resource managed by a particular {@link HBaseTap} instance. + */ +public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector { +  /** Field LOG */ +  private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class); +  /** Field conf */ +  private final JobConf conf; +  /** Field writer */ +  private RecordWriter writer; +  /** Field flowProcess */ +  private final FlowProcess<JobConf> hadoopFlowProcess; +  /** Field tap */ +  private final Tap<JobConf, RecordReader, OutputCollector> tap; +  /** Field reporter */ +  private final Reporter reporter = Reporter.NULL; + +  /** +   * Constructor TapCollector creates a new TapCollector instance. +   *  +   * @param flowProcess +   * @param tap +   *          of type Tap +   * @throws IOException +   *           when fails to initialize +   */ +  public HBaseTapCollector(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException { +    super(flowProcess, tap.getScheme()); +    this.hadoopFlowProcess = flowProcess; +    this.tap = tap; +    this.conf = new JobConf(flowProcess.getConfigCopy()); +    this.setOutput(this); +  } + +  @Override +  public void prepare() { +    try { +      initialize(); +    } catch (IOException e) { +      throw new RuntimeException(e); +    } + +    super.prepare(); +  } + +  private void initialize() throws IOException { +    tap.sinkConfInit(hadoopFlowProcess, conf); +    OutputFormat outputFormat = conf.getOutputFormat(); +    LOG.info("Output format class is: " + outputFormat.getClass().toString()); +    writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL); +    sinkCall.setOutput(this); +  } + +  @Override +  public void close() { +    try { +      LOG.info("closing tap collector for: {}", tap); +      writer.close(reporter); +    } catch (IOException exception) { +      LOG.warn("exception closing: {}", exception); +      throw new TapException("exception closing HBaseTapCollector", exception); +    } finally { +      super.close(); +    } +  } + +  /** +   * Method collect writes the given values to the {@link Tap} this instance +   * encapsulates. +   *  +   * @param writableComparable +   *          of type WritableComparable +   * @param writable +   *          of type Writable +   * @throws IOException +   *           when +   */ +  public void collect(Object writableComparable, Object writable) throws IOException { +    if (hadoopFlowProcess instanceof HadoopFlowProcess) +      ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + +    writer.write(writableComparable, writable); +  } +} | 
