aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-08-16 14:19:51 +0100
committerChandan Rajah <crajah@parallelai.com>2013-08-16 14:19:51 +0100
commit466e739d71f426422881986cf018643b7876acf5 (patch)
tree77a3260b651544362cd47fb4dc1d2e11e847f37b /src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
parentb9d987c0d9946f8f778fbec2856305c0f20fd3f8 (diff)
downloadSpyGlass-466e739d71f426422881986cf018643b7876acf5.tar.gz
SpyGlass-466e739d71f426422881986cf018643b7876acf5.zip
Adding Delete functionality to the HBaseSource
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java20
1 files changed, 19 insertions, 1 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
index 1cca133..50aa116 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
@@ -2,6 +2,8 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
+import cascading.tap.SinkMode;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter;
public class HBaseRecordWriter
implements RecordWriter<ImmutableBytesWritable, Put> {
private HTable m_table;
+ private SinkMode m_sinkMode = SinkMode.UPDATE;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
@@ -30,8 +33,23 @@ public class HBaseRecordWriter
m_table.close();
}
+ public void setSinkMode(SinkMode sinkMode) {
+ m_sinkMode = sinkMode;
+ }
+
public void write(ImmutableBytesWritable key,
Put value) throws IOException {
- m_table.put(new Put(value));
+ switch(m_sinkMode) {
+ case UPDATE:
+ m_table.put(new Put(value));
+ break;
+
+ case REPLACE:
+ m_table.delete(new Delete(value.getRow()));
+ break;
+
+ default:
+ throw new IOException("Unknown Sink Mode : " + m_sinkMode);
+ }
}
}