aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-08-16 14:19:51 +0100
committerChandan Rajah <crajah@parallelai.com>2013-08-16 14:19:51 +0100
commit466e739d71f426422881986cf018643b7876acf5 (patch)
tree77a3260b651544362cd47fb4dc1d2e11e847f37b /src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
parentb9d987c0d9946f8f778fbec2856305c0f20fd3f8 (diff)
downloadSpyGlass-466e739d71f426422881986cf018643b7876acf5.tar.gz
SpyGlass-466e739d71f426422881986cf018643b7876acf5.zip
Adding Delete functionality to the HBaseSource
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java28
1 files changed, 20 insertions, 8 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
index 40f1faf..c145eb0 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -2,6 +2,7 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
+import cascading.tap.SinkMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;
/**
* Convert Map/Reduce output and write it to an HBase table
*/
public class HBaseOutputFormat extends
-FileOutputFormat<ImmutableBytesWritable, Put> {
+FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class);
+ private SinkMode sinkMode = SinkMode.UPDATE;
+
+ @Override
+ public void configure(JobConf conf) {
+ sinkMode = SinkMode.valueOf(
+ conf.get(
+ String.format(
+ HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE)
+ )
+ )
+ );
+ }
+
@Override
@SuppressWarnings("unchecked")
@@ -43,8 +52,11 @@ FileOutputFormat<ImmutableBytesWritable, Put> {
LOG.error(e);
throw e;
}
+ // TODO: Should Autoflush be set to true ????
table.setAutoFlush(false);
- return new HBaseRecordWriter(table);
+ HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);
+ recordWriter.setSinkMode(sinkMode);
+ return recordWriter;
}
@Override