1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
package parallelai.spyglass.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
/**
* An HBase table can be split across multiple regions
*
* Regional - is where we get the information
* 'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)'
*
* Granular on the other hand is when we go deep at a specific region
*
* Note: An HBase table can exist in multiple regions / region server as well
*/
public class HBaseInputFormatRegional extends HBaseInputFormatBase {
private HBaseInputFormatGranular granular = new HBaseInputFormatGranular();
private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class);
@Override
public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException {
granular.configure(job);
HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits);
HBaseTableSplitRegional[] splits = convertToRegionalSplitArray(gSplits);
if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL");
for(HBaseTableSplitGranular g : gSplits) {
LOG.info("GRANULAR => " + g);
}
for(HBaseTableSplitRegional r : splits ) {
LOG.info("REGIONAL => " + r);
}
return splits;
}
@Override
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
if (!(inputSplit instanceof HBaseTableSplitRegional))
throw new IOException("Table Split is not type HBaseTableSplitRegional");
HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit;
LOG.info("REGIONAL SPLIT -> " + tSplit);
HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional();
HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
trr.setHTable(this.table);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
trr.init(tSplit);
return trr;
}
private HBaseTableSplitRegional[] convertToRegionalSplitArray(
HBaseTableSplitGranular[] splits) throws IOException {
if (splits == null)
throw new IOException("The list of splits is null => " + splits);
HashMap<String, HBaseTableSplitRegional> regionSplits = new HashMap<String, HBaseTableSplitRegional>();
for (HBaseTableSplitGranular hbt : splits) {
HBaseTableSplitRegional mis = null;
if (regionSplits.containsKey(hbt.getRegionName())) {
mis = regionSplits.get(hbt.getRegionName());
} else {
regionSplits.put(hbt.getRegionName(), new HBaseTableSplitRegional(
hbt.getRegionLocation(), hbt.getRegionName()));
mis = regionSplits.get(hbt.getRegionName());
}
mis.addSplit(hbt);
regionSplits.put(hbt.getRegionName(), mis);
}
// for(String region : regionSplits.keySet() ) {
// regionSplits.get(region)
// }
Collection<HBaseTableSplitRegional> outVals = regionSplits.values();
LOG.debug("".format("Returning array of splits : %s", outVals));
if (outVals == null)
throw new IOException("The list of multi input splits were null");
return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]);
}
}
|