aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
blob: 5d2b613aeeb2a52868403246311be0f2a068f3c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package parallelai.spyglass.hbase;

import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;

import parallelai.spyglass.hbase.HBaseConstants.SourceMode;

public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {

	static final Log LOG = LogFactory.getLog(HBaseRecordReaderRegional.class);


    private byte[] nextKey = null;
    private Vector<List<KeyValue>> resultVector = null;
    Map<Long, List<KeyValue>> keyValueMap = null;

    private HBaseTableSplitRegional multiSplit = null;
	private HBaseTableSplitGranular currentSplit = null;

    private HBaseRecordReaderGranular currentRecordReader = null;

	public void init(HBaseTableSplitRegional mSplit) throws IOException {
		multiSplit = mSplit;

		LOG.debug("Creating Multi Split for region location : "
                + multiSplit.getRegionLocation() + " -> " + multiSplit);

        setNextSplit();
	}

	public boolean setNextSplit() throws IOException {
        currentSplit = multiSplit.getNextSplit();

        LOG.debug("IN: setNextSplit : " + currentSplit );

		if( currentSplit != null ) {
			setSplitValue(currentSplit);
			return true;
		} else {
			return false;
		}
	}

    private void setRecordReaderParms(HBaseRecordReaderGranular trr, HBaseTableSplitGranular tSplit) throws IOException {
        HBaseConfigUtils.setRecordReaderParms(trr, tSplit);

        trr.setHTable(htable);
        trr.setInputColumns(trrInputColumns);
        trr.setRowFilter(trrRowFilter);

        trr.init();
    }

	private void setSplitValue(HBaseTableSplitGranular tSplit) throws IOException {
        LOG.debug("IN: setSplitValue : " + tSplit );

        if( currentRecordReader != null ) currentRecordReader.close();

        currentRecordReader = new HBaseRecordReaderGranular();
        setRecordReaderParms(currentRecordReader, currentSplit);
	}

    @Override
    public boolean next(ImmutableBytesWritable ibw, Result result) throws IOException {
        boolean nextFlag = currentRecordReader.next(ibw, result);

        while(nextFlag == false && multiSplit.hasMoreSplits() ) {
            totalPos += currentRecordReader.getPos();
            setNextSplit();
            nextFlag = currentRecordReader.next(ibw, result);
        }

        return nextFlag;
    }

    long totalPos = 0;

    @Override
    public ImmutableBytesWritable createKey() {
        return currentRecordReader.createKey();
    }

    @Override
    public Result createValue() {
        return currentRecordReader.createValue();
    }

    @Override
    public long getPos() throws IOException {
        long pos = totalPos + currentRecordReader.getPos();
        return pos;
    }

    @Override
    public void close() throws IOException {
        currentRecordReader.close();
    }

    @Override
    public float getProgress() throws IOException {
        // ( current count + percent of next count ) / max count
        float prog = ((multiSplit.getCurrSplitCount() + currentRecordReader.getProgress()) / multiSplit.getLength());
        return prog;
    }
}