aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseScheme.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java35
1 files changed, 31 insertions, 4 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index aa446c1..6f04f01 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -12,6 +12,7 @@
package parallelai.spyglass.hbase;
+import parallelai.spyglass.hbase.HBaseConstants.SplitType;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
@@ -65,6 +66,8 @@ public class HBaseScheme
// private transient byte[][] fields;
private boolean useSalt = false;
+
+ private SplitType splitType = SplitType.GRANULAR;
/**
@@ -279,11 +282,31 @@ public class HBaseScheme
@Override
public void sourceConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
- conf.setInputFormat(HBaseInputFormat.class);
- String columns = getColumns();
- LOG.debug("sourcing from columns: {}", columns);
- conf.set(HBaseInputFormat.COLUMN_LIST, columns);
+ switch(splitType) {
+ case GRANULAR:
+ {
+ conf.setInputFormat(HBaseInputFormatGranular.class);
+
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
+ conf.set(HBaseInputFormatGranular.COLUMN_LIST, columns);
+ }
+ break;
+
+ case REGIONAL:
+ {
+ conf.setInputFormat(HBaseInputFormatRegional.class);
+
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
+ conf.set(HBaseInputFormatRegional.COLUMN_LIST, columns);
+ }
+ break;
+
+ default:
+ LOG.error("Unknown Split Type : " + splitType.toString());
+ }
}
private String getColumns() {
@@ -345,4 +368,8 @@ public class HBaseScheme
result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0);
return result;
}
+
+ public void setInputSplitTye(SplitType sType) {
+ this.splitType = sType;
+ }
}