aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
blob: 8185b2216cb0fd968ba88a17b22280b53e07bdd2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package parallelai.spyglass.hbase;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;

/**
 * Created with IntelliJ IDEA.
 * User: chand_000
 * Date: 29/08/13
 * Time: 12:24
 * To change this template use File | Settings | File Templates.
 */
public class HBaseInputFormatRegional extends HBaseInputFormatBase {
    private HBaseInputFormatGranular granular = new HBaseInputFormatGranular();
    private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class);


    @Override
    public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException {
        granular.configure(job);
        HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits);

        HBaseTableSplitRegional[] splits = convertToRegionalSplitArray(gSplits);

        if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL");

        for(HBaseTableSplitGranular g : gSplits) {
            LOG.info("GRANULAR => " + g);
        }

        for(HBaseTableSplitRegional r : splits ) {
            LOG.info("REGIONAL => " + r);
        }

        return splits;
    }

    @Override
    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
        if (!(inputSplit instanceof HBaseTableSplitRegional))
            throw new IOException("Table Split is not type HBaseTableSplitRegional");

        HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit;

        LOG.info("REGIONAL SPLIT -> " + tSplit);

        HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional();

        HBaseConfigUtils.setRecordReaderParms(trr, tSplit);

        trr.setHTable(this.table);
        trr.setInputColumns(this.inputColumns);
        trr.setRowFilter(this.rowFilter);

        trr.init(tSplit);

        return trr;
    }

    private HBaseTableSplitRegional[] convertToRegionalSplitArray(
            HBaseTableSplitGranular[] splits) throws IOException {

        if (splits == null)
            throw new IOException("The list of splits is null => " + splits);

        HashMap<String, HBaseTableSplitRegional> regionSplits = new HashMap<String, HBaseTableSplitRegional>();

        for (HBaseTableSplitGranular hbt : splits) {
            HBaseTableSplitRegional mis = null;
            if (regionSplits.containsKey(hbt.getRegionName())) {
                mis = regionSplits.get(hbt.getRegionName());
            } else {
                regionSplits.put(hbt.getRegionName(), new HBaseTableSplitRegional(
                        hbt.getRegionLocation(), hbt.getRegionName()));
                mis = regionSplits.get(hbt.getRegionName());
            }

            mis.addSplit(hbt);
            regionSplits.put(hbt.getRegionName(), mis);
        }

//        for(String region : regionSplits.keySet() ) {
//            regionSplits.get(region)
//        }

        Collection<HBaseTableSplitRegional> outVals = regionSplits.values();

        LOG.debug("".format("Returning array of splits : %s", outVals));

        if (outVals == null)
            throw new IOException("The list of multi input splits were null");

        return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]);
    }

}