diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass')
5 files changed, 50 insertions, 13 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5baf20e..25b89cb 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -19,4 +19,5 @@ public class HBaseConstants {    public static final String USE_SALT = "hbase.%s.use.salt";    public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; +  public static final String SINK_MODE = "hbase.%s.sink.mode";  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 40f1faf..c145eb0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase;  import java.io.IOException; +import cascading.tap.SinkMode;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.fs.FileSystem; @@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Put;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*;  import org.apache.hadoop.util.Progressable;  /**   * Convert Map/Reduce output and write it to an HBase table   */  public class HBaseOutputFormat extends -FileOutputFormat<ImmutableBytesWritable, Put> { +FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {    /** JobConf parameter that specifies the output table */    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";    private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); +  private SinkMode sinkMode = SinkMode.UPDATE; + +  @Override +  public void configure(JobConf conf) { +      sinkMode = SinkMode.valueOf( +        conf.get( +            String.format( +                HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE) +            ) +        ) +      ); +  } +    @Override    @SuppressWarnings("unchecked") @@ -43,8 +52,11 @@ FileOutputFormat<ImmutableBytesWritable, Put> {        LOG.error(e);        throw e;      } +    // TODO: Should Autoflush be set to true ????      table.setAutoFlush(false); -    return new HBaseRecordWriter(table); +    HBaseRecordWriter recordWriter = new HBaseRecordWriter(table); +    recordWriter.setSinkMode(sinkMode); +    return recordWriter;    }    @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..832ce95 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto  		public void copyValue(Result oldValue, Result newValue) {  			if (null != oldValue && null != newValue) { -				oldValue.copyFrom(newValue); +//				oldValue.copyFrom(newValue);  			}  		} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 1cca133..50aa116 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -2,6 +2,8 @@ package parallelai.spyglass.hbase;  import java.io.IOException; +import cascading.tap.SinkMode; +import org.apache.hadoop.hbase.client.Delete;  import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Put;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter;  public class HBaseRecordWriter    implements RecordWriter<ImmutableBytesWritable, Put> {    private HTable m_table; +  private SinkMode m_sinkMode = SinkMode.UPDATE;    /**     * Instantiate a TableRecordWriter with the HBase HClient for writing. @@ -30,8 +33,23 @@ public class HBaseRecordWriter      m_table.close();    } +  public void setSinkMode(SinkMode sinkMode) { +      m_sinkMode = sinkMode; +  } +    public void write(ImmutableBytesWritable key,        Put value) throws IOException { -    m_table.put(new Put(value)); +    switch(m_sinkMode) { +        case UPDATE: +            m_table.put(new Put(value)); +            break; + +        case REPLACE: +            m_table.delete(new Delete(value.getRow())); +            break; + +        default: +            throw new IOException("Unknown Sink Mode : " + m_sinkMode); +    }    }  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 65873a0..07b5aa7 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -154,7 +154,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      if (isReplace() && conf.get("mapred.task.partition") == null) {        try {          deleteResource(conf); - +        conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.REPLACE.toString());        } catch (IOException e) {          throw new RuntimeException("could not delete resource: " + e);        } @@ -163,6 +163,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      else if (isUpdate()) {        try {            createResource(conf); +          conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.UPDATE.toString());        } catch (IOException e) {            throw new RuntimeException(tableName + " does not exist !", e);        } @@ -220,8 +221,13 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {    @Override    public boolean deleteResource(JobConf jobConf) throws IOException { -    // TODO: for now we don't do anything just to be safe -    return true; +      HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +      if (hBaseAdmin.tableExists(tableName)) { +          return true; +      } else { +          throw new IOException("DELETE records: " + tableName + " does NOT EXIST!!!"); +      }    }    @Override | 
