diff options
author | Chandan Rajah <crajah@parallelai.com> | 2013-08-16 14:19:51 +0100 |
---|---|---|
committer | Chandan Rajah <crajah@parallelai.com> | 2013-08-16 14:19:51 +0100 |
commit | 466e739d71f426422881986cf018643b7876acf5 (patch) | |
tree | 77a3260b651544362cd47fb4dc1d2e11e847f37b | |
parent | b9d987c0d9946f8f778fbec2856305c0f20fd3f8 (diff) | |
download | SpyGlass-466e739d71f426422881986cf018643b7876acf5.tar.gz SpyGlass-466e739d71f426422881986cf018643b7876acf5.zip |
Adding Delete functionality to the HBaseSource
7 files changed, 56 insertions, 18 deletions
@@ -22,13 +22,13 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <cdh.version>cdh4.3.0</cdh.version> + <cdh.version>cdh4.2.0</cdh.version> <datafu.version>0.0.4-${cdh.version}</datafu.version> <flume.version>1.3.0-${cdh.version}</flume.version> <hadoop.version>2.0.0-${cdh.version}</hadoop.version> <hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version> - <hbase.version>0.94.6-${cdh.version}</hbase.version> + <hbase.version>0.94.2-${cdh.version}</hbase.version> <hive.version>0.10.0-${cdh.version}</hive.version> <mahout.version>0.7-${cdh.version}</mahout.version> <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version> @@ -66,7 +66,7 @@ <name>Cascading and Scalding wrapper for HBase with advanced features</name> <groupId>parallelai</groupId> <artifactId>parallelai.spyglass</artifactId> - <version>${scala.version}_3.1.0</version> + <version>${scala.version}_3.2.0</version> <packaging>jar</packaging> <distributionManagement> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5baf20e..25b89cb 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -19,4 +19,5 @@ public class HBaseConstants { public static final String USE_SALT = "hbase.%s.use.salt"; public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + public static final String SINK_MODE = "hbase.%s.sink.mode"; } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 40f1faf..c145eb0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -2,6 +2,7 @@ package parallelai.spyglass.hbase; import java.io.IOException; +import cascading.tap.SinkMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** * Convert Map/Reduce output and write it to an HBase table */ public class HBaseOutputFormat extends -FileOutputFormat<ImmutableBytesWritable, Put> { +FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class); + private SinkMode sinkMode = SinkMode.UPDATE; + + @Override + public void configure(JobConf conf) { + sinkMode = SinkMode.valueOf( + conf.get( + String.format( + HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE) + ) + ) + ); + } + @Override @SuppressWarnings("unchecked") @@ -43,8 +52,11 @@ FileOutputFormat<ImmutableBytesWritable, Put> { LOG.error(e); throw e; } + // TODO: Should Autoflush be set to true ???? table.setAutoFlush(false); - return new HBaseRecordWriter(table); + HBaseRecordWriter recordWriter = new HBaseRecordWriter(table); + recordWriter.setSinkMode(sinkMode); + return recordWriter; } @Override diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..832ce95 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto public void copyValue(Result oldValue, Result newValue) { if (null != oldValue && null != newValue) { - oldValue.copyFrom(newValue); +// oldValue.copyFrom(newValue); } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 1cca133..50aa116 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -2,6 +2,8 @@ package parallelai.spyglass.hbase; import java.io.IOException; +import cascading.tap.SinkMode; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter; public class HBaseRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { private HTable m_table; + private SinkMode m_sinkMode = SinkMode.UPDATE; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. @@ -30,8 +33,23 @@ public class HBaseRecordWriter m_table.close(); } + public void setSinkMode(SinkMode sinkMode) { + m_sinkMode = sinkMode; + } + public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + switch(m_sinkMode) { + case UPDATE: + m_table.put(new Put(value)); + break; + + case REPLACE: + m_table.delete(new Delete(value.getRow())); + break; + + default: + throw new IOException("Unknown Sink Mode : " + m_sinkMode); + } } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 65873a0..07b5aa7 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -154,7 +154,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { if (isReplace() && conf.get("mapred.task.partition") == null) { try { deleteResource(conf); - + conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.REPLACE.toString()); } catch (IOException e) { throw new RuntimeException("could not delete resource: " + e); } @@ -163,6 +163,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { else if (isUpdate()) { try { createResource(conf); + conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.UPDATE.toString()); } catch (IOException e) { throw new RuntimeException(tableName + " does not exist !", e); } @@ -220,8 +221,13 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { @Override public boolean deleteResource(JobConf jobConf) throws IOException { - // TODO: for now we don't do anything just to be safe - return true; + HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } else { + throw new IOException("DELETE records: " + tableName + " does NOT EXIST!!!"); + } } @Override diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index d6795aa..7ff7860 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -39,7 +39,8 @@ case class HBaseSource( keyList: List[String] = null, versions: Int = 1, useSalt: Boolean = false, - prefixList: String = null + prefixList: String = null, + sinkMode: SinkMode = SinkMode.UPDATE ) extends Source { override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) @@ -79,7 +80,7 @@ case class HBaseSource( hbt.asInstanceOf[Tap[_,_,_]] } case Write => { - val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) + val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) hbt.setUseSaltInSink(useSalt) |