aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java59
1 files changed, 59 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
new file mode 100644
index 0000000..40f1faf
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -0,0 +1,59 @@
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+public class HBaseOutputFormat extends
+FileOutputFormat<ImmutableBytesWritable, Put> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+ private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class);
+
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress) throws IOException {
+
+ // expecting exactly one path
+
+ String tableName = job.get(OUTPUT_TABLE);
+ HTable table = null;
+ try {
+ table = new HTable(HBaseConfiguration.create(job), tableName);
+ } catch(IOException e) {
+ LOG.error(e);
+ throw e;
+ }
+ table.setAutoFlush(false);
+ return new HBaseRecordWriter(table);
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if(tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+} \ No newline at end of file