From ca77538beaea8f7529cbb1f6cd4570eb39d1e9fd Mon Sep 17 00:00:00 2001 From: galarragas Date: Mon, 17 Mar 2014 14:08:53 +0000 Subject: Adding support of timestamp-based scan filtering --- .../spyglass/hbase/HBaseConfigUtils.java | 1 + .../spyglass/hbase/HBaseRecordReaderBase.java | 3 ++ .../spyglass/hbase/HBaseRecordReaderGranular.java | 36 ++++++++++------------ .../parallelai/spyglass/hbase/HBaseScheme.java | 3 ++ .../spyglass/hbase/HBaseTableSplitBase.java | 14 ++++++++- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 598a988..77df84e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -27,6 +27,7 @@ public class HBaseConfigUtils { trr.setEndRow(tSplit.getEndRow()); trr.setEndRowInclusive(tSplit.getEndRowInclusive()); trr.setUseSalt(tSplit.getUseSalt()); + trr.setTimestamp(tSplit.getTimestamp()); } break; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index e0d0cbe..cc64cb4 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -41,6 +41,7 @@ public abstract class HBaseRecordReaderBase implements protected boolean logScannerActivity = false; protected int logPerRowCount = 100; protected int noOfLogCount = 0; + protected long timestamp = -1; @Override public String toString() { @@ -116,6 +117,8 @@ public abstract class HBaseRecordReaderBase implements this.versions = versions; } + public void setTimestamp(long timestamp) {this.timestamp = timestamp; } + public void setUseSalt(boolean useSalt) { this.useSalt = useSalt; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index 2155d99..fb10615 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -1,35 +1,24 @@ package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.Vector; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.StringUtils; -import org.jruby.javasupport.util.RuntimeHelpers; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { @@ -72,6 +61,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { scan.setFilter(trrRowFilter); scan.setCacheBlocks(true); scan.setCaching(scanCaching); + if(timestamp != -1) { + scan.setTimeStamp(timestamp); + } this.scanner = this.htable.getScanner(scan); currentScan = scan; } else { @@ -82,6 +74,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { TableInputFormat.addColumns(scan, trrInputColumns); scan.setCacheBlocks(true); scan.setCaching(scanCaching); + if(timestamp != -1) { + scan.setTimeStamp(timestamp); + } this.scanner = this.htable.getScanner(scan); currentScan = scan; } @@ -93,7 +88,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); scan.setCacheBlocks(true); - scan.setCaching(scanCaching); + scan.setCaching(scanCaching); + if(timestamp != -1) { + scan.setTimeStamp(timestamp); + } this.scanner = this.htable.getScanner(scan); currentScan = scan; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index 6f04f01..ba83a63 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -54,6 +54,7 @@ public class HBaseScheme /** Long timestamp */ private long timeStamp; + private boolean useTimeStampInRead = false; /** String familyNames */ private String[] familyNames; @@ -112,6 +113,7 @@ public class HBaseScheme this.timeStamp = timeStamp; this.familyNames = familyNames; this.valueFields = valueFields; + this.useTimeStampInRead = (timeStamp > 0); setSourceSink(this.keyField, this.valueFields); @@ -138,6 +140,7 @@ public class HBaseScheme this.keyField = keyField; this.valueFields = valueFields; this.timeStamp = System.currentTimeMillis(); + this.useTimeStampInRead = false; validate(); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index e24771f..3b72a1d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -2,7 +2,6 @@ package parallelai.spyglass.hbase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.InputSplit; @@ -35,6 +34,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, protected boolean m_endRowInclusive = true; protected int m_versions = 1; protected boolean m_useSalt = false; + protected long m_timestamp = -1L; /** @return table name */ @@ -95,6 +95,13 @@ public abstract class HBaseTableSplitBase implements InputSplit, return this.m_regionName; } + public long getTimestamp() { + return m_timestamp; + } + + public void setTimestamp(long m_timestamp) { + this.m_timestamp = m_timestamp; + } public void copy(HBaseTableSplitBase that) { this.m_endRow = that.m_endRow; @@ -107,6 +114,7 @@ public abstract class HBaseTableSplitBase implements InputSplit, this.m_versions = that.m_versions; this.m_regionLocation = that.m_regionLocation; this.m_regionName = that.m_regionName; + this.m_timestamp = that.m_timestamp; } @Override @@ -139,6 +147,8 @@ public abstract class HBaseTableSplitBase implements InputSplit, break; } + this.m_timestamp = Bytes.toLong(Bytes.readByteArray(in)); + LOG.debug("READ and CREATED : " + this); } @@ -169,6 +179,8 @@ public abstract class HBaseTableSplitBase implements InputSplit, break; } + Bytes.writeByteArray(out, Bytes.toBytes(m_timestamp)); + LOG.debug("WROTE : " + out.toString()); } } -- cgit v1.2.3