aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java1
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java36
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java14
5 files changed, 37 insertions, 20 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
index 598a988..77df84e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
@@ -27,6 +27,7 @@ public class HBaseConfigUtils {
trr.setEndRow(tSplit.getEndRow());
trr.setEndRowInclusive(tSplit.getEndRowInclusive());
trr.setUseSalt(tSplit.getUseSalt());
+ trr.setTimestamp(tSplit.getTimestamp());
}
break;
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
index e0d0cbe..cc64cb4 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
@@ -41,6 +41,7 @@ public abstract class HBaseRecordReaderBase implements
protected boolean logScannerActivity = false;
protected int logPerRowCount = 100;
protected int noOfLogCount = 0;
+ protected long timestamp = -1;
@Override
public String toString() {
@@ -116,6 +117,8 @@ public abstract class HBaseRecordReaderBase implements
this.versions = versions;
}
+ public void setTimestamp(long timestamp) {this.timestamp = timestamp; }
+
public void setUseSalt(boolean useSalt) {
this.useSalt = useSalt;
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index 2155d99..fb10615 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -1,35 +1,24 @@
package parallelai.spyglass.hbase;
-import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.Vector;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
-import org.jruby.javasupport.util.RuntimeHelpers;
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
@@ -72,6 +61,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
scan.setFilter(trrRowFilter);
scan.setCacheBlocks(true);
scan.setCaching(scanCaching);
+ if(timestamp != -1) {
+ scan.setTimeStamp(timestamp);
+ }
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
} else {
@@ -82,6 +74,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setCacheBlocks(true);
scan.setCaching(scanCaching);
+ if(timestamp != -1) {
+ scan.setTimeStamp(timestamp);
+ }
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
@@ -93,7 +88,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
scan.setCacheBlocks(true);
- scan.setCaching(scanCaching);
+ scan.setCaching(scanCaching);
+ if(timestamp != -1) {
+ scan.setTimeStamp(timestamp);
+ }
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index 6f04f01..ba83a63 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -54,6 +54,7 @@ public class HBaseScheme
/** Long timestamp */
private long timeStamp;
+ private boolean useTimeStampInRead = false;
/** String familyNames */
private String[] familyNames;
@@ -112,6 +113,7 @@ public class HBaseScheme
this.timeStamp = timeStamp;
this.familyNames = familyNames;
this.valueFields = valueFields;
+ this.useTimeStampInRead = (timeStamp > 0);
setSourceSink(this.keyField, this.valueFields);
@@ -138,6 +140,7 @@ public class HBaseScheme
this.keyField = keyField;
this.valueFields = valueFields;
this.timeStamp = System.currentTimeMillis();
+ this.useTimeStampInRead = false;
validate();
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
index e24771f..3b72a1d 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
@@ -2,7 +2,6 @@ package parallelai.spyglass.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.InputSplit;
@@ -35,6 +34,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
protected boolean m_endRowInclusive = true;
protected int m_versions = 1;
protected boolean m_useSalt = false;
+ protected long m_timestamp = -1L;
/** @return table name */
@@ -95,6 +95,13 @@ public abstract class HBaseTableSplitBase implements InputSplit,
return this.m_regionName;
}
+ public long getTimestamp() {
+ return m_timestamp;
+ }
+
+ public void setTimestamp(long m_timestamp) {
+ this.m_timestamp = m_timestamp;
+ }
public void copy(HBaseTableSplitBase that) {
this.m_endRow = that.m_endRow;
@@ -107,6 +114,7 @@ public abstract class HBaseTableSplitBase implements InputSplit,
this.m_versions = that.m_versions;
this.m_regionLocation = that.m_regionLocation;
this.m_regionName = that.m_regionName;
+ this.m_timestamp = that.m_timestamp;
}
@Override
@@ -139,6 +147,8 @@ public abstract class HBaseTableSplitBase implements InputSplit,
break;
}
+ this.m_timestamp = Bytes.toLong(Bytes.readByteArray(in));
+
LOG.debug("READ and CREATED : " + this);
}
@@ -169,6 +179,8 @@ public abstract class HBaseTableSplitBase implements InputSplit,
break;
}
+ Bytes.writeByteArray(out, Bytes.toBytes(m_timestamp));
+
LOG.debug("WROTE : " + out.toString());
}
}