diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseScheme.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseScheme.java | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index e5acc30..a7d36fd 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -24,7 +24,6 @@ import cascading.util.Util; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -63,7 +62,9 @@ public class HBaseScheme /** String columns */ private transient String[] columns; /** Field fields */ - private transient byte[][] fields; +// private transient byte[][] fields; + + private boolean useSalt = false; /** @@ -237,8 +238,13 @@ public class HBaseScheme OutputCollector outputCollector = sinkCall.getOutput(); Tuple key = tupleEntry.selectTuple(keyField); ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); + + if( useSalt ) { + keyBytes = HBaseSalter.addSaltPrefix(keyBytes); + } + Put put = new Put(keyBytes.get(), this.timeStamp); - + for (int i = 0; i < valueFields.length; i++) { Fields fieldSelector = valueFields[i]; TupleEntry values = tupleEntry.selectEntry(fieldSelector); @@ -258,10 +264,13 @@ public class HBaseScheme @Override public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { - conf.setOutputFormat(TableOutputFormat.class); + conf.setOutputFormat(HBaseOutputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); conf.setOutputValueClass(Put.class); + + String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE); + useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false); } @Override |