diff options
author | Kristian Kaufmann <kristianwkaufmann@gmail.com> | 2015-06-12 16:20:09 -0400 |
---|---|---|
committer | Kristian Kaufmann <kristianwkaufmann@gmail.com> | 2015-06-12 16:20:09 -0400 |
commit | d9f59cf18afadb81b18c0d270948d82721d16cfb (patch) | |
tree | ffb57477385df684c16bc3cd211cb74cea17c71a /src/main | |
parent | 4c1f607584fe923aa2a5de50d4b6ec59a58be85a (diff) | |
download | SpyGlass-d9f59cf18afadb81b18c0d270948d82721d16cfb.tar.gz SpyGlass-d9f59cf18afadb81b18c0d270948d82721d16cfb.zip |
Adds autoFlush setting to HbaseSource.
The setting defaults to true to reproduce current behavior
Diffstat (limited to 'src/main')
5 files changed, 17 insertions, 3 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 7453d3e..a5e67b1 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -26,6 +26,7 @@ public class HBaseConstants { public static final String VERSIONS = "hbase.%s.versions"; public static final String USE_SALT = "hbase.%s.use.salt"; public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; + public static final String AUTO_FLUSH = "hbase.%s.auto_flush"; public static final String SINK_MODE = "hbase.%s.sink.mode"; }
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 401dea0..2ad8924 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -51,8 +51,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { LOG.error(e); throw e; } - // TODO: Should Autoflush be set to true ???? - DONE - table.setAutoFlush(true); + table.setAutoFlush(job.getBoolean(String.format(HBaseConstants.AUTO_FLUSH, tableName),false)); HBaseRecordWriter recordWriter = new HBaseRecordWriter(table); recordWriter.setSinkMode(sinkMode); return recordWriter; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 50aa116..24654f5 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -30,6 +30,9 @@ public class HBaseRecordWriter public void close(Reporter reporter) throws IOException { + if (!m_table.isAutoFlush()) { + m_table.flushCommits(); + } m_table.close(); } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 3576e96..e4a3f78 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -404,9 +404,11 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { private static class SinkConfig implements Serializable { public String tableName = null; public boolean useSalt = false; + public boolean autoFlush = false; public void configure(Configuration jobConf) { jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.setBoolean(String.format(HBaseConstants.AUTO_FLUSH, tableName), autoFlush); } } @@ -459,6 +461,14 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { sinkConfigList.add(sc); } + + public void setHBaseSinkParms( boolean useSalt, boolean autoFlush ) { + SinkConfig sc = new SinkConfig(); + sc.tableName = tableName; + sc.autoFlush = autoFlush; + sc.useSalt = useSalt; + sinkConfigList.add(sc); + } private void setPrefixList(SourceConfig sc, String prefixList ) { prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList; diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 30c28c9..92b3792 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -40,6 +40,7 @@ case class HBaseSource( versions: Int = 1, useSalt: Boolean = false, prefixList: String = null, + autoFlush: Boolean = true, sinkMode: SinkMode = SinkMode.UPDATE, inputSplitType: SplitType = SplitType.GRANULAR ) extends Source { @@ -89,7 +90,7 @@ case class HBaseSource( case Write => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) - hbt.setUseSaltInSink(useSalt) + hbt.setHBaseSinkParms(useSalt, autoFlush) hbt.asInstanceOf[Tap[_,_,_]] } |