From 466e739d71f426422881986cf018643b7876acf5 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Fri, 16 Aug 2013 14:19:51 +0100 Subject: Adding Delete functionality to the HBaseSource --- .../parallelai/spyglass/hbase/HBaseRecordWriter.java | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 1cca133..50aa116 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -2,6 +2,8 @@ package parallelai.spyglass.hbase; import java.io.IOException; +import cascading.tap.SinkMode; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter; public class HBaseRecordWriter implements RecordWriter { private HTable m_table; + private SinkMode m_sinkMode = SinkMode.UPDATE; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. @@ -30,8 +33,23 @@ public class HBaseRecordWriter m_table.close(); } + public void setSinkMode(SinkMode sinkMode) { + m_sinkMode = sinkMode; + } + public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + switch(m_sinkMode) { + case UPDATE: + m_table.put(new Put(value)); + break; + + case REPLACE: + m_table.delete(new Delete(value.getRow())); + break; + + default: + throw new IOException("Unknown Sink Mode : " + m_sinkMode); + } } } -- cgit v1.2.3