aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java99
1 files changed, 99 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
new file mode 100644
index 0000000..eadb57e
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
@@ -0,0 +1,99 @@
+package parallelai.spyglass.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chand_000
+ * Date: 29/08/13
+ * Time: 12:24
+ * To change this template use File | Settings | File Templates.
+ */
+public class HBaseInputFormatRegional extends HBaseInputFormatBase {
+ private HBaseInputFormatGranular granular = new HBaseInputFormatGranular();
+ private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class);
+
+
+ @Override
+ public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException {
+ granular.configure(job);
+ HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits);
+
+ HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits );
+
+ if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL");
+
+ LOG.info("GRANULAR => " + gSplits);
+ LOG.info("REGIONAL => " + splits);
+
+ return splits;
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
+ if (!(inputSplit instanceof HBaseTableSplitRegional))
+ throw new IOException("Table Split is not type HBaseTableSplitRegional");
+
+ LOG.info("REGIONAL SPLIT -> " + inputSplit);
+
+ HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit;
+
+ HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional();
+
+ HBaseConfigUtils.setRecordReaderParms(trr, tSplit);
+
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+
+ trr.init(tSplit);
+
+ return trr;
+ }
+
+ private HBaseTableSplitRegional[] convertToMultiSplitArray(
+ HBaseTableSplitGranular[] splits) throws IOException {
+
+ if (splits == null)
+ throw new IOException("The list of splits is null => " + splits);
+
+ HashMap<String, HBaseTableSplitRegional> regionSplits = new HashMap<String, HBaseTableSplitRegional>();
+
+ for (HBaseTableSplitGranular hbt : splits) {
+ HBaseTableSplitRegional mis = null;
+ if (regionSplits.containsKey(hbt.getRegionLocation())) {
+ mis = regionSplits.get(hbt.getRegionLocation());
+ } else {
+ regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional(
+ hbt.getRegionLocation()));
+ mis = regionSplits.get(hbt.getRegionLocation());
+ }
+
+ mis.addSplit(hbt);
+ regionSplits.put(hbt.getRegionLocation(), mis);
+ }
+
+// for(String region : regionSplits.keySet() ) {
+// regionSplits.get(region)
+// }
+
+ Collection<HBaseTableSplitRegional> outVals = regionSplits.values();
+
+ LOG.debug("".format("Returning array of splits : %s", outVals));
+
+ if (outVals == null)
+ throw new IOException("The list of multi input splits were null");
+
+ return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]);
+ }
+
+}