aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTap.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java12
1 files changed, 9 insertions, 3 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 65873a0..07b5aa7 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -154,7 +154,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
if (isReplace() && conf.get("mapred.task.partition") == null) {
try {
deleteResource(conf);
-
+ conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.REPLACE.toString());
} catch (IOException e) {
throw new RuntimeException("could not delete resource: " + e);
}
@@ -163,6 +163,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
else if (isUpdate()) {
try {
createResource(conf);
+ conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.UPDATE.toString());
} catch (IOException e) {
throw new RuntimeException(tableName + " does not exist !", e);
}
@@ -220,8 +221,13 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
@Override
public boolean deleteResource(JobConf jobConf) throws IOException {
- // TODO: for now we don't do anything just to be safe
- return true;
+ HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
+
+ if (hBaseAdmin.tableExists(tableName)) {
+ return true;
+ } else {
+ throw new IOException("DELETE records: " + tableName + " does NOT EXIST!!!");
+ }
}
@Override