diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 1cca133..50aa116 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -2,6 +2,8 @@ package parallelai.spyglass.hbase; import java.io.IOException; +import cascading.tap.SinkMode; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter; public class HBaseRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { private HTable m_table; + private SinkMode m_sinkMode = SinkMode.UPDATE; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. @@ -30,8 +33,23 @@ public class HBaseRecordWriter m_table.close(); } + public void setSinkMode(SinkMode sinkMode) { + m_sinkMode = sinkMode; + } + public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + switch(m_sinkMode) { + case UPDATE: + m_table.put(new Put(value)); + break; + + case REPLACE: + m_table.delete(new Delete(value.getRow())); + break; + + default: + throw new IOException("Unknown Sink Mode : " + m_sinkMode); + } } } |