aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorChandan Rajah <crajah@parallelai.com>2013-08-16 14:19:51 +0100
committerChandan Rajah <crajah@parallelai.com>2013-08-16 14:19:51 +0100
commit466e739d71f426422881986cf018643b7876acf5 (patch)
tree77a3260b651544362cd47fb4dc1d2e11e847f37b /src
parentb9d987c0d9946f8f778fbec2856305c0f20fd3f8 (diff)
downloadSpyGlass-466e739d71f426422881986cf018643b7876acf5.tar.gz
SpyGlass-466e739d71f426422881986cf018643b7876acf5.zip
Adding Delete functionality to the HBaseSource
Diffstat (limited to 'src')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConstants.java1
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java28
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java20
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java12
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala5
6 files changed, 53 insertions, 15 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
index 5baf20e..25b89cb 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
@@ -19,4 +19,5 @@ public class HBaseConstants {
public static final String USE_SALT = "hbase.%s.use.salt";
public static final String SALT_PREFIX = "hbase.%s.salt.prefix";
+ public static final String SINK_MODE = "hbase.%s.sink.mode";
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
index 40f1faf..c145eb0 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -2,6 +2,7 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
+import cascading.tap.SinkMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -9,24 +10,32 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;
/**
* Convert Map/Reduce output and write it to an HBase table
*/
public class HBaseOutputFormat extends
-FileOutputFormat<ImmutableBytesWritable, Put> {
+FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
private final Log LOG = LogFactory.getLog(HBaseOutputFormat.class);
+ private SinkMode sinkMode = SinkMode.UPDATE;
+
+ @Override
+ public void configure(JobConf conf) {
+ sinkMode = SinkMode.valueOf(
+ conf.get(
+ String.format(
+ HBaseConstants.SINK_MODE, conf.get(HBaseOutputFormat.OUTPUT_TABLE)
+ )
+ )
+ );
+ }
+
@Override
@SuppressWarnings("unchecked")
@@ -43,8 +52,11 @@ FileOutputFormat<ImmutableBytesWritable, Put> {
LOG.error(e);
throw e;
}
+ // TODO: Should Autoflush be set to true ????
table.setAutoFlush(false);
- return new HBaseRecordWriter(table);
+ HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);
+ recordWriter.setSinkMode(sinkMode);
+ return recordWriter;
}
@Override
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
index 7b62c88..832ce95 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -278,7 +278,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto
public void copyValue(Result oldValue, Result newValue) {
if (null != oldValue && null != newValue) {
- oldValue.copyFrom(newValue);
+// oldValue.copyFrom(newValue);
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
index 1cca133..50aa116 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
@@ -2,6 +2,8 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
+import cascading.tap.SinkMode;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -15,6 +17,7 @@ import org.apache.hadoop.mapred.Reporter;
public class HBaseRecordWriter
implements RecordWriter<ImmutableBytesWritable, Put> {
private HTable m_table;
+ private SinkMode m_sinkMode = SinkMode.UPDATE;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
@@ -30,8 +33,23 @@ public class HBaseRecordWriter
m_table.close();
}
+ public void setSinkMode(SinkMode sinkMode) {
+ m_sinkMode = sinkMode;
+ }
+
public void write(ImmutableBytesWritable key,
Put value) throws IOException {
- m_table.put(new Put(value));
+ switch(m_sinkMode) {
+ case UPDATE:
+ m_table.put(new Put(value));
+ break;
+
+ case REPLACE:
+ m_table.delete(new Delete(value.getRow()));
+ break;
+
+ default:
+ throw new IOException("Unknown Sink Mode : " + m_sinkMode);
+ }
}
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 65873a0..07b5aa7 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -154,7 +154,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
if (isReplace() && conf.get("mapred.task.partition") == null) {
try {
deleteResource(conf);
-
+ conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.REPLACE.toString());
} catch (IOException e) {
throw new RuntimeException("could not delete resource: " + e);
}
@@ -163,6 +163,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
else if (isUpdate()) {
try {
createResource(conf);
+ conf.set( String.format(HBaseConstants.SINK_MODE, tableName), SinkMode.UPDATE.toString());
} catch (IOException e) {
throw new RuntimeException(tableName + " does not exist !", e);
}
@@ -220,8 +221,13 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
@Override
public boolean deleteResource(JobConf jobConf) throws IOException {
- // TODO: for now we don't do anything just to be safe
- return true;
+ HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
+
+ if (hBaseAdmin.tableExists(tableName)) {
+ return true;
+ } else {
+ throw new IOException("DELETE records: " + tableName + " does NOT EXIST!!!");
+ }
}
@Override
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index d6795aa..7ff7860 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -39,7 +39,8 @@ case class HBaseSource(
keyList: List[String] = null,
versions: Int = 1,
useSalt: Boolean = false,
- prefixList: String = null
+ prefixList: String = null,
+ sinkMode: SinkMode = SinkMode.UPDATE
) extends Source {
override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
@@ -79,7 +80,7 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
hbt.setUseSaltInSink(useSalt)