aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseScheme.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java17
1 files changed, 13 insertions, 4 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index e5acc30..a7d36fd 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -24,7 +24,6 @@ import cascading.util.Util;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -63,7 +62,9 @@ public class HBaseScheme
/** String columns */
private transient String[] columns;
/** Field fields */
- private transient byte[][] fields;
+// private transient byte[][] fields;
+
+ private boolean useSalt = false;
/**
@@ -237,8 +238,13 @@ public class HBaseScheme
OutputCollector outputCollector = sinkCall.getOutput();
Tuple key = tupleEntry.selectTuple(keyField);
ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0);
+
+ if( useSalt ) {
+ keyBytes = HBaseSalter.addSaltPrefix(keyBytes);
+ }
+
Put put = new Put(keyBytes.get(), this.timeStamp);
-
+
for (int i = 0; i < valueFields.length; i++) {
Fields fieldSelector = valueFields[i];
TupleEntry values = tupleEntry.selectEntry(fieldSelector);
@@ -258,10 +264,13 @@ public class HBaseScheme
@Override
public void sinkConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
- conf.setOutputFormat(TableOutputFormat.class);
+ conf.setOutputFormat(HBaseOutputFormat.class);
conf.setOutputKeyClass(ImmutableBytesWritable.class);
conf.setOutputValueClass(Put.class);
+
+ String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE);
+ useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false);
}
@Override