aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java20
1 files changed, 19 insertions, 1 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
index 1cca133..50aa116 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
@@ -2,6 +2,8 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
+import cascading.tap.SinkMode;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter;
public class HBaseRecordWriter
implements RecordWriter<ImmutableBytesWritable, Put> {
private HTable m_table;
+ private SinkMode m_sinkMode = SinkMode.UPDATE;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
@@ -30,8 +33,23 @@ public class HBaseRecordWriter
m_table.close();
}
+ public void setSinkMode(SinkMode sinkMode) {
+ m_sinkMode = sinkMode;
+ }
+
public void write(ImmutableBytesWritable key,
Put value) throws IOException {
- m_table.put(new Put(value));
+ switch(m_sinkMode) {
+ case UPDATE:
+ m_table.put(new Put(value));
+ break;
+
+ case REPLACE:
+ m_table.delete(new Delete(value.getRow()));
+ break;
+
+ default:
+ throw new IOException("Unknown Sink Mode : " + m_sinkMode);
+ }
}
}