diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-09-04 10:32:07 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-09-04 10:32:07 +0100 |
commit | 3501e241a2313cf49c371630cb6ebe0c3a47e991 (patch) | |
tree | 99b4e48c7590f94a4cbe8acf9ffbc036241ab737 /src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java | |
parent | 147a423b345ea365c22af48727c83ea4f31b948c (diff) | |
download | SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.tar.gz SpyGlass-3501e241a2313cf49c371630cb6ebe0c3a47e991.zip |
Extensive changes to the underlying code base.
Fully tested and working support for region level spliting
Reduced number of mappers.
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java new file mode 100644 index 0000000..e2b1ec8 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -0,0 +1,124 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + +public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { + + static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class); + + + private byte[] nextKey = null; + private Vector<List<KeyValue>> resultVector = null; + Map<Long, List<KeyValue>> keyValueMap = null; + + private HBaseTableSplitRegional multiSplit = null; + private HBaseTableSplitGranular currentSplit = null; + + private HBaseRecordReaderGranular currentRecordReader = null; + + public void init(HBaseTableSplitRegional mSplit) throws IOException { + multiSplit = mSplit; + + LOG.debug("Creating Multi Split for region location : " + + multiSplit.getRegionLocation() + " -> " + multiSplit); + + setNextSplit(); + } + + public boolean setNextSplit() throws IOException { + currentSplit = multiSplit.getNextSplit(); + + LOG.debug("IN: setNextSplit : " + currentSplit ); + + if( currentSplit != null ) { + setSplitValue(currentSplit); + return true; + } else { + return false; + } + } + + private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException { + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(htable); + trr.setInputColumns(trrInputColumns); + trr.setRowFilter(trrRowFilter); + + trr.init(); + } + + private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException { + LOG.debug("IN: setSplitValue : " + tSplit ); + + if( currentRecordReader != null ) currentRecordReader.close(); + + currentRecordReader = new HBaseRecordReaderGranular(); + setRecordReaderParms(currentRecordReader, currentSplit); + } + + @Override + public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException { + boolean nextFlag = currentRecordReader.next(ibw, result); + + while(nextFlag == false && multiSplit.hasMoreSplits() ) { + setNextSplit(); + nextFlag = currentRecordReader.next(ibw, result); + } + + return nextFlag; + } + + @Override + public ImmutableBytesWritable createKey() { + return currentRecordReader.createKey(); + } + + @Override + public Result createValue() { + return currentRecordReader.createValue(); + } + + @Override + public long getPos() throws IOException { + return currentRecordReader.getPos(); + } + + @Override + public void close() throws IOException { + currentRecordReader.close(); + } + + @Override + public float getProgress() throws IOException { + return currentRecordReader.getProgress(); + } +} |