From 466e739d71f426422881986cf018643b7876acf5 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Fri, 16 Aug 2013 14:19:51 +0100 Subject: Adding Delete functionality to the HBaseSource --- .../parallelai/spyglass/hbase/HBaseConstants.java | 1 + .../spyglass/hbase/HBaseOutputFormat.java | 28 +++++++++++++++------- .../parallelai/spyglass/hbase/HBaseRawScheme.java | 2 +- .../spyglass/hbase/HBaseRecordWriter.java | 20 +++++++++++++++- .../java/parallelai/spyglass/hbase/HBaseTap.java | 12 +++++++--- .../parallelai/spyglass/hbase/HBaseSource.scala | 5 ++-- 6 files changed, 53 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5baf20e..25b89cb 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -19,4 +19,5 @@ public class HBaseConstants { public static final String USE_SALT = "hbase.%s.use.salt"; public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + public static final String SINK_MODE = "hbase.%s.sink.mode"; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 40f1faf..c145eb0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase; import java.io.IOException; +import cascading.tap.SinkMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** * Convert Map/Reduce output and write it to an HBase table */ public class HBaseOutputFormat extends -FileOutputFormat { +FileOutputFormat implements JobConfigurable { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); + private SinkMode sinkMode = SinkMode.UPDATE; + + @Override + public void configure(JobConf conf) { + sinkMode = SinkMode.valueOf( + conf.get( + String.format( + HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE) + ) + ) + ); + } + @Override @SuppressWarnings("unchecked") @@ -43,8 +52,11 @@ FileOutputFormat { LOG.error(e); throw e; } + // TODO: Should Autoflush be set to true ???? table.setAutoFlush(false); - return new HBaseRecordWriter(table); + HBaseRecordWriter recordWriter = new HBaseRecordWriter(table); + recordWriter.setSinkMode(sinkMode); + return recordWriter; } @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..832ce95 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme { private HTable m_table; + private SinkMode m_sinkMode = SinkMode.UPDATE; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. @@ -30,8 +33,23 @@ public class HBaseRecordWriter m_table.close(); } + public void setSinkMode(SinkMode sinkMode) { + m_sinkMode = sinkMode; + } + public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + switch(m_sinkMode) { + case UPDATE: + m_table.put(new Put(value)); + break; + + case REPLACE: + m_table.delete(new Delete(value.getRow())); + break; + + default: + throw new IOException("Unknown Sink Mode : " + m_sinkMode); + } } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 65873a0..07b5aa7 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -154,7 +154,7 @@ public class HBaseTap extends Tap { if (isReplace() && conf.get("mapred.task.partition") == null) { try { deleteResource(conf); - + conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.REPLACE.toString()); } catch (IOException e) { throw new RuntimeException("could not delete resource: " + e); } @@ -163,6 +163,7 @@ public class HBaseTap extends Tap { else if (isUpdate()) { try { createResource(conf); + conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.UPDATE.toString()); } catch (IOException e) { throw new RuntimeException(tableName + " does not exist !", e); } @@ -220,8 +221,13 @@ public class HBaseTap extends Tap { @Override public boolean deleteResource(JobConf jobConf) throws IOException { - // TODO: for now we don't do anything just to be safe - return true; + HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } else { + throw new IOException("DELETE records: " + tableName + " does NOT EXIST!!!"); + } } @Override diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index d6795aa..7ff7860 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -39,7 +39,8 @@ case class HBaseSource( keyList: List[String] = null, versions: Int = 1, useSalt: Boolean = false, - prefixList: String = null + prefixList: String = null, + sinkMode: SinkMode = SinkMode.UPDATE ) extends Source { override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) @@ -79,7 +80,7 @@ case class HBaseSource( hbt.asInstanceOf[Tap[_,_,_]] } case Write => { - val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) + val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) hbt.setUseSaltInSink(useSalt) -- cgit v1.2.3