aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java28
1 files changed, 20 insertions, 8 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
index 40f1faf..c145eb0 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -2,6 +2,7 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
+import cascading.tap.SinkMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;
/**
* Convert Map/Reduce output and write it to an HBase table
*/
public class HBaseOutputFormat extends
-FileOutputFormat<ImmutableBytesWritable, Put> {
+FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class);
+ private SinkMode sinkMode = SinkMode.UPDATE;
+
+ @Override
+ public void configure(JobConf conf) {
+ sinkMode = SinkMode.valueOf(
+ conf.get(
+ String.format(
+ HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE)
+ )
+ )
+ );
+ }
+
@Override
@SuppressWarnings("unchecked")
@@ -43,8 +52,11 @@ FileOutputFormat<ImmutableBytesWritable, Put> {
LOG.error(e);
throw e;
}
+ // TODO: Should Autoflush be set to true ????
table.setAutoFlush(false);
- return new HBaseRecordWriter(table);
+ HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);
+ recordWriter.setSinkMode(sinkMode);
+ return recordWriter;
}
@Override