diff options
| author | galarragas <galarragas@gmail.com> | 2014-03-17 14:08:53 +0000 | 
|---|---|---|
| committer | galarragas <galarragas@gmail.com> | 2014-03-17 14:08:53 +0000 | 
| commit | ca77538beaea8f7529cbb1f6cd4570eb39d1e9fd (patch) | |
| tree | 4ecefedf38a096c87098f44b13b4f9e4830eacb0 /src/main/java/parallelai | |
| parent | 74610d12d00064291a68423227761b2ddef2d4f0 (diff) | |
| download | SpyGlass-ca77538beaea8f7529cbb1f6cd4570eb39d1e9fd.tar.gz SpyGlass-ca77538beaea8f7529cbb1f6cd4570eb39d1e9fd.zip | |
Adding support of timestamp-based scan filtering
Diffstat (limited to 'src/main/java/parallelai')
5 files changed, 37 insertions, 20 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 598a988..77df84e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -27,6 +27,7 @@ public class HBaseConfigUtils {                  trr.setEndRow(tSplit.getEndRow());                  trr.setEndRowInclusive(tSplit.getEndRowInclusive());                  trr.setUseSalt(tSplit.getUseSalt()); +                trr.setTimestamp(tSplit.getTimestamp());              }              break; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index e0d0cbe..cc64cb4 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -41,6 +41,7 @@ public abstract class HBaseRecordReaderBase implements      protected boolean logScannerActivity = false;      protected int logPerRowCount = 100;      protected int noOfLogCount = 0; +    protected long timestamp = -1;      @Override      public String toString() { @@ -116,6 +117,8 @@ public abstract class HBaseRecordReaderBase implements          this.versions = versions;      } +    public void setTimestamp(long timestamp) {this.timestamp = timestamp; } +      public void setUseSalt(boolean useSalt) {          this.useSalt = useSalt;      } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 2155d99..fb10615 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -1,35 +1,24 @@  package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.Vector; -  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.hbase.KeyValue;  import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result;  import org.apache.hadoop.hbase.client.ResultScanner;  import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes;  +import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader;  import org.apache.hadoop.util.StringUtils; -import org.jruby.javasupport.util.RuntimeHelpers; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector;  public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @@ -72,6 +61,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {          scan.setFilter(trrRowFilter);          scan.setCacheBlocks(true);          scan.setCaching(scanCaching); +        if(timestamp != -1) { +            scan.setTimeStamp(timestamp); +        }          this.scanner = this.htable.getScanner(scan);          currentScan = scan;        } else { @@ -82,6 +74,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {          TableInputFormat.addColumns(scan, trrInputColumns);            scan.setCacheBlocks(true);            scan.setCaching(scanCaching); +          if(timestamp != -1) { +              scan.setTimeStamp(timestamp); +          }          this.scanner = this.htable.getScanner(scan);          currentScan = scan;        } @@ -93,7 +88,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {        TableInputFormat.addColumns(scan, trrInputColumns);        scan.setFilter(trrRowFilter);        scan.setCacheBlocks(true); -        scan.setCaching(scanCaching); +      scan.setCaching(scanCaching); +      if(timestamp != -1) { +        scan.setTimeStamp(timestamp); +      }        this.scanner = this.htable.getScanner(scan);        currentScan = scan;      } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index 6f04f01..ba83a63 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -54,6 +54,7 @@ public class HBaseScheme    /** Long timestamp */    private long timeStamp; +  private boolean useTimeStampInRead = false;    /** String familyNames */    private String[] familyNames; @@ -112,6 +113,7 @@ public class HBaseScheme      this.timeStamp = timeStamp;      this.familyNames = familyNames;      this.valueFields = valueFields; +    this.useTimeStampInRead = (timeStamp > 0);      setSourceSink(this.keyField, this.valueFields); @@ -138,6 +140,7 @@ public class HBaseScheme      this.keyField = keyField;      this.valueFields = valueFields;      this.timeStamp = System.currentTimeMillis(); +    this.useTimeStampInRead = false;      validate(); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index e24771f..3b72a1d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -2,7 +2,6 @@ package parallelai.spyglass.hbase;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.InputSplit; @@ -35,6 +34,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,      protected boolean m_endRowInclusive = true;      protected int m_versions = 1;      protected boolean m_useSalt = false; +    protected long m_timestamp = -1L;      /** @return table name */ @@ -95,6 +95,13 @@ public abstract class HBaseTableSplitBase implements InputSplit,          return this.m_regionName;      } +    public long getTimestamp() { +        return m_timestamp; +    } + +    public void setTimestamp(long m_timestamp) { +        this.m_timestamp = m_timestamp; +    }      public void copy(HBaseTableSplitBase that) {          this.m_endRow = that.m_endRow; @@ -107,6 +114,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,          this.m_versions = that.m_versions;          this.m_regionLocation = that.m_regionLocation;          this.m_regionName = that.m_regionName; +        this.m_timestamp = that.m_timestamp;      }      @Override @@ -139,6 +147,8 @@ public abstract class HBaseTableSplitBase implements InputSplit,                  break;          } +        this.m_timestamp = Bytes.toLong(Bytes.readByteArray(in)); +          LOG.debug("READ and CREATED : " + this);      } @@ -169,6 +179,8 @@ public abstract class HBaseTableSplitBase implements InputSplit,                  break;          } +        Bytes.writeByteArray(out, Bytes.toBytes(m_timestamp)); +          LOG.debug("WROTE : " + out.toString());      }  } | 
