diff options
5 files changed, 17 insertions, 3 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 7453d3e..a5e67b1 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -26,6 +26,7 @@ public class HBaseConstants {      public static final String VERSIONS = "hbase.%s.versions";      public static final String USE_SALT = "hbase.%s.use.salt";      public static final String SALT_PREFIX = "hbase.%s.salt.prefix"; +    public static final String AUTO_FLUSH = "hbase.%s.auto_flush";      public static final String SINK_MODE = "hbase.%s.sink.mode";  }
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 401dea0..2ad8924 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -51,8 +51,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {        LOG.error(e);        throw e;      } -    // TODO: Should Autoflush be set to true ???? - DONE -    table.setAutoFlush(true); +    table.setAutoFlush(job.getBoolean(String.format(HBaseConstants.AUTO_FLUSH, tableName),false));      HBaseRecordWriter recordWriter = new HBaseRecordWriter(table);      recordWriter.setSinkMode(sinkMode);      return recordWriter; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java index 50aa116..24654f5 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordWriter.java @@ -30,6 +30,9 @@ public class HBaseRecordWriter    public void close(Reporter reporter)      throws IOException { +    if (!m_table.isAutoFlush()) { +        m_table.flushCommits(); +    }      m_table.close();    } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 3576e96..e4a3f78 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -404,9 +404,11 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {    private static class SinkConfig implements Serializable {  	  public String tableName = null;  	  public boolean useSalt = false; +	  public boolean autoFlush = false;  	  public void configure(Configuration jobConf) {            jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); +          jobConf.setBoolean(String.format(HBaseConstants.AUTO_FLUSH, tableName), autoFlush);  	  }    } @@ -459,6 +461,14 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      sinkConfigList.add(sc);    } + +  public void setHBaseSinkParms( boolean useSalt, boolean autoFlush ) { +    SinkConfig sc = new SinkConfig(); +    sc.tableName = tableName; +    sc.autoFlush = autoFlush; +    sc.useSalt = useSalt; +    sinkConfigList.add(sc); +  }    private void setPrefixList(SourceConfig sc, String prefixList ) {  	  prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList; diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 30c28c9..92b3792 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -40,6 +40,7 @@ case class HBaseSource(      versions: Int = 1,      useSalt: Boolean = false,      prefixList: String = null, +    autoFlush: Boolean = true,      sinkMode: SinkMode = SinkMode.UPDATE,      inputSplitType: SplitType = SplitType.GRANULAR    ) extends Source { @@ -89,7 +90,7 @@ case class HBaseSource(          case Write => {            val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) -          hbt.setUseSaltInSink(useSalt) +          hbt.setHBaseSinkParms(useSalt, autoFlush)            hbt.asInstanceOf[Tap[_,_,_]]          }  | 
