aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-09-04 10:32:07 +0100
committerChandan Rajah <crajah@parallelai.com>2013-09-04 10:32:07 +0100
commit3501e241a2313cf49c371630cb6ebe0c3a47e991 (patch)
tree99b4e48c7590f94a4cbe8acf9ffbc036241ab737 /src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
parent147a423b345ea365c22af48727c83ea4f31b948c (diff)
downloadSpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.tar.gz
SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.zip
Extensive changes to the underlying code base.
Fully tested and working support for region level spliting Reduced number of mappers.
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java124
1 files changed, 124 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
new file mode 100644
index 0000000..e2b1ec8
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
@@ -0,0 +1,124 @@
+package parallelai.spyglass.hbase;
+
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.StringUtils;
+
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+
+public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
+
+ static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class);
+
+
+ private byte[] nextKey = null;
+ private Vector<List<KeyValue>> resultVector = null;
+ Map<Long, List<KeyValue>> keyValueMap = null;
+
+ private HBaseTableSplitRegional multiSplit = null;
+ private HBaseTableSplitGranular currentSplit = null;
+
+ private HBaseRecordReaderGranular currentRecordReader = null;
+
+ public void init(HBaseTableSplitRegional mSplit) throws IOException {
+ multiSplit = mSplit;
+
+ LOG.debug("Creating Multi Split for region location : "
+ + multiSplit.getRegionLocation() + " -> " + multiSplit);
+
+ setNextSplit();
+ }
+
+ public boolean setNextSplit() throws IOException {
+ currentSplit = multiSplit.getNextSplit();
+
+ LOG.debug("IN: setNextSplit : " + currentSplit );
+
+ if( currentSplit != null ) {
+ setSplitValue(currentSplit);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException {
+ HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
+
+ trr.setHTable(htable);
+ trr.setInputColumns(trrInputColumns);
+ trr.setRowFilter(trrRowFilter);
+
+ trr.init();
+ }
+
+ private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException {
+ LOG.debug("IN: setSplitValue : " + tSplit );
+
+ if( currentRecordReader != null ) currentRecordReader.close();
+
+ currentRecordReader = new HBaseRecordReaderGranular();
+ setRecordReaderParms(currentRecordReader, currentSplit);
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException {
+ boolean nextFlag = currentRecordReader.next(ibw, result);
+
+ while(nextFlag == false && multiSplit.hasMoreSplits() ) {
+ setNextSplit();
+ nextFlag = currentRecordReader.next(ibw, result);
+ }
+
+ return nextFlag;
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return currentRecordReader.createKey();
+ }
+
+ @Override
+ public Result createValue() {
+ return currentRecordReader.createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return currentRecordReader.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return currentRecordReader.getProgress();
+ }
+}