From 3501e241a2313cf49c371630cb6ebe0c3a47e991 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Wed, 4 Sep 2013 10:32:07 +0100 Subject: Extensive changes to the underlying code base. Fully tested and working support for region level spliting Reduced number of mappers. --- .../spyglass/hbase/HBaseInputFormatRegional.java | 99 ++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java new file mode 100644 index 0000000..eadb57e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * Created with IntelliJ IDEA. + * User: chand_000 + * Date: 29/08/13 + * Time: 12:24 + * To change this template use File | Settings | File Templates. + */ +public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); + private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); + + + @Override + public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { + granular.configure(job); + HBaseTableSplitGranular[] gSplits = granular.getSplits(job, numSplits); + + HBaseTableSplitRegional[] splits = convertToMultiSplitArray( gSplits ); + + if( splits == null ) throw new IOException("Not sure WTF is going on? splits is NULL"); + + LOG.info("GRANULAR => " + gSplits); + LOG.info("REGIONAL => " + splits); + + return splits; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + if (!(inputSplit instanceof HBaseTableSplitRegional)) + throw new IOException("Table Split is not type HBaseTableSplitRegional"); + + LOG.info("REGIONAL SPLIT -> " + inputSplit); + + HBaseTableSplitRegional tSplit = (HBaseTableSplitRegional)inputSplit; + + HBaseRecordReaderRegional trr = new HBaseRecordReaderRegional(); + + HBaseConfigUtils.setRecordReaderParms(trr, tSplit); + + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + + trr.init(tSplit); + + return trr; + } + + private HBaseTableSplitRegional[] convertToMultiSplitArray( + HBaseTableSplitGranular[] splits) throws IOException { + + if (splits == null) + throw new IOException("The list of splits is null => " + splits); + + HashMap regionSplits = new HashMap(); + + for (HBaseTableSplitGranular hbt : splits) { + HBaseTableSplitRegional mis = null; + if (regionSplits.containsKey(hbt.getRegionLocation())) { + mis = regionSplits.get(hbt.getRegionLocation()); + } else { + regionSplits.put(hbt.getRegionLocation(), new HBaseTableSplitRegional( + hbt.getRegionLocation())); + mis = regionSplits.get(hbt.getRegionLocation()); + } + + mis.addSplit(hbt); + regionSplits.put(hbt.getRegionLocation(), mis); + } + +// for(String region : regionSplits.keySet() ) { +// regionSplits.get(region) +// } + + Collection outVals = regionSplits.values(); + + LOG.debug("".format("Returning array of splits : %s", outVals)); + + if (outVals == null) + throw new IOException("The list of multi input splits were null"); + + return outVals.toArray(new HBaseTableSplitRegional[outVals.size()]); + } + +} -- cgit v1.2.3