aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
blob: 3c62f82cd3e1ba0ac6fd8d7cc12c86594e61f16b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package parallelai.spyglass.hbase;

import java.io.IOException;

import cascading.tap.SinkMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;

/**
 * Convert Map/Reduce output and write it to an HBase table
 */
public class HBaseOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {

  /** JobConf parameter that specifies the output table */
  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
  private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class);

  private SinkMode sinkMode = SinkMode.UPDATE;

  @Override
  public void configure(JobConf conf) {
      sinkMode = SinkMode.valueOf(
        conf.get(
            String.format(
                HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE)
            )
        )
      );
  }


  @Override
  @SuppressWarnings("unchecked")
  public RecordWriter getRecordWriter(FileSystem ignored,
      JobConf job, String name, Progressable progress) throws IOException {

    // expecting exactly one path

    String tableName = job.get(OUTPUT_TABLE);
    HTable table = null;
    try {
      table = new HTable(HBaseConfiguration.create(job), tableName);
    } catch(IOException e) {
      LOG.error(e);
      throw e;
    }
    // TODO: Should Autoflush be set to true ???? - DONE
    table.setAutoFlush(true);
    HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);
    recordWriter.setSinkMode(sinkMode);
    return recordWriter;
  }

  @Override
  public void checkOutputSpecs(FileSystem ignored, JobConf job)
  throws FileAlreadyExistsException, InvalidJobConfException, IOException {

    String tableName = job.get(OUTPUT_TABLE);
    if(tableName == null) {
      throw new IOException("Must specify table name");
    }
  }
}