aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKristian Kaufmann <kristianwkaufmann@gmail.com>2015-06-12 16:20:09 -0400
committerKristian Kaufmann <kristianwkaufmann@gmail.com>2015-06-12 16:20:09 -0400
commitd9f59cf18afadb81b18c0d270948d82721d16cfb (patch)
treeffb57477385df684c16bc3cd211cb74cea17c71a
parent4c1f607584fe923aa2a5de50d4b6ec59a58be85a (diff)
downloadSpyGlass-d9f59cf18afadb81b18c0d270948d82721d16cfb.tar.gz
SpyGlass-d9f59cf18afadb81b18c0d270948d82721d16cfb.zip
Adds autoFlush setting to HbaseSource.
The setting defaults to true to reproduce current behavior
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConstants.java1
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java10
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala3
5 files changed, 17 insertions, 3 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
index 7453d3e..a5e67b1 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
@@ -26,6 +26,7 @@ public class HBaseConstants {
public static final String VERSIONS = "hbase.%s.versions";
public static final String USE_SALT = "hbase.%s.use.salt";
public static final String SALT_PREFIX = "hbase.%s.salt.prefix";
+ public static final String AUTO_FLUSH = "hbase.%s.auto_flush";
public static final String SINK_MODE = "hbase.%s.sink.mode";
} \ No newline at end of file
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
index 401dea0..2ad8924 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -51,8 +51,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
LOG.error(e);
throw e;
}
- // TODO: Should Autoflush be set to true ???? - DONE
- table.setAutoFlush(true);
+ table.setAutoFlush(job.getBoolean(String.format(HBaseConstants.AUTO_FLUSH, tableName),false));
HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);
recordWriter.setSinkMode(sinkMode);
return recordWriter;
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
index 50aa116..24654f5 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java
@@ -30,6 +30,9 @@ public class HBaseRecordWriter
public void close(Reporter reporter)
throws IOException {
+ if (!m_table.isAutoFlush()) {
+ m_table.flushCommits();
+ }
m_table.close();
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 3576e96..e4a3f78 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -404,9 +404,11 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
private static class SinkConfig implements Serializable {
public String tableName = null;
public boolean useSalt = false;
+ public boolean autoFlush = false;
public void configure(Configuration jobConf) {
jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt);
+ jobConf.setBoolean(String.format(HBaseConstants.AUTO_FLUSH, tableName), autoFlush);
}
}
@@ -459,6 +461,14 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
sinkConfigList.add(sc);
}
+
+ public void setHBaseSinkParms( boolean useSalt, boolean autoFlush ) {
+ SinkConfig sc = new SinkConfig();
+ sc.tableName = tableName;
+ sc.autoFlush = autoFlush;
+ sc.useSalt = useSalt;
+ sinkConfigList.add(sc);
+ }
private void setPrefixList(SourceConfig sc, String prefixList ) {
prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList;
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 30c28c9..92b3792 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -40,6 +40,7 @@ case class HBaseSource(
versions: Int = 1,
useSalt: Boolean = false,
prefixList: String = null,
+ autoFlush: Boolean = true,
sinkMode: SinkMode = SinkMode.UPDATE,
inputSplitType: SplitType = SplitType.GRANULAR
) extends Source {
@@ -89,7 +90,7 @@ case class HBaseSource(
case Write => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
- hbt.setUseSaltInSink(useSalt)
+ hbt.setHBaseSinkParms(useSalt, autoFlush)
hbt.asInstanceOf[Tap[_,_,_]]
}