diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-08-16 14:19:51 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-08-16 14:19:51 +0100 |
commit | 466e739d71f426422881986cf018643b7876acf5 (patch) | |
tree | 77a3260b651544362cd47fb4dc1d2e11e847f37b /src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java | |
parent | b9d987c0d9946f8f778fbec2856305c0f20fd3f8 (diff) | |
download | SpyGlass-466e739d71f426422881986cf018643b7876acf5.tar.gz SpyGlass-466e739d71f426422881986cf018643b7876acf5.zip |
Adding Delete functionality to the HBaseSource
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 40f1faf..c145eb0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase; import java.io.IOException; +import cascading.tap.SinkMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** * Convert Map/Reduce output and write it to an HBase table */ public class HBaseOutputFormat extends -FileOutputFormat<ImmutableBytesWritable, Put> { +FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); + private SinkMode sinkMode = SinkMode.UPDATE; + + @Override + public void configure(JobConf conf) { + sinkMode = SinkMode.valueOf( + conf.get( + String.format( + HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE) + ) + ) + ); + } + @Override @SuppressWarnings("unchecked") @@ -43,8 +52,11 @@ FileOutputFormat<ImmutableBytesWritable, Put> { LOG.error(e); throw e; } + // TODO: Should Autoflush be set to true ???? table.setAutoFlush(false); - return new HBaseRecordWriter(table); + HBaseRecordWriter recordWriter = new HBaseRecordWriter(table); + recordWriter.setSinkMode(sinkMode); + return recordWriter; } @Override |