From 6e21e0c68248a33875898b86a2be7a9cec7df3d4 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Thu, 6 Jun 2013 12:27:15 +0100 Subject: Added extensions to Read and Write mode. Added support for key prefixes --- .../java/parallelai/spyglass/hbase/HBaseTap.java | 90 ++++++++++++++++++---- 1 file changed, 74 insertions(+), 16 deletions(-) (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTap.java') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 9a0ed0e..65873a0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,6 +12,7 @@ package parallelai.spyglass.hbase; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.flow.FlowProcess; import cascading.tap.SinkMode; import cascading.tap.Tap; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -35,13 +35,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Map.Entry; +import java.util.Arrays; import java.util.UUID; /** @@ -172,7 +169,12 @@ public class HBaseTap extends Tap { } - conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); + conf.set(HBaseOutputFormat.OUTPUT_TABLE, tableName); + + for( SinkConfig sc : sinkConfigList) { + sc.configure(conf); + } + super.sinkConfInit(process, conf); } @@ -292,10 +294,19 @@ public class HBaseTap extends Tap { public String startKey = null; public String stopKey = null; public String [] keyList = null; + public int versions = 1; + public boolean useSalt = false; + public String prefixList = null; public void configure(Configuration jobConf) { switch( sourceMode ) { case SCAN_RANGE: + if (stopKey != null && startKey != null && startKey.compareTo(stopKey) > 0) { + String t = stopKey; + stopKey = startKey; + startKey = t; + } + jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); if( startKey != null && startKey.length() > 0 ) @@ -304,57 +315,104 @@ public class HBaseTap extends Tap { if( stopKey != null && stopKey.length() > 0 ) jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); - LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); - LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); - LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); + // Added for Salting + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); + + LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + LOG.info(String.format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); + LOG.info(String.format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); break; case GET_LIST: jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); + jobConf.setInt(String.format(HBaseConstants.VERSIONS, tableName), versions); - LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); - LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); + // Added for Salting + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); + + LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + LOG.info(String.format("Setting KEY LIST (%s) to key list length (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList.length)); break; default: jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); - LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); + // Added for Salting + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + jobConf.set(String.format(HBaseConstants.SALT_PREFIX, tableName), prefixList); + + LOG.info(String.format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); break; } } } + private static class SinkConfig implements Serializable { + public String tableName = null; + public boolean useSalt = false; + + public void configure(Configuration jobConf) { + jobConf.setBoolean(String.format(HBaseConstants.USE_SALT, tableName), useSalt); + } + } + private ArrayList sourceConfigList = new ArrayList(); - - public void setHBaseRangeParms(String startKey, String stopKey ) { + private ArrayList sinkConfigList = new ArrayList(); + + public void setHBaseRangeParms(String startKey, String stopKey, boolean useSalt, String prefixList ) { SourceConfig sc = new SourceConfig(); sc.sourceMode = SourceMode.SCAN_RANGE; sc.tableName = tableName; sc.startKey = startKey; sc.stopKey = stopKey; + sc.useSalt = useSalt; + setPrefixList(sc, prefixList); sourceConfigList.add(sc); } - public void setHBaseListParms(String [] keyList ) { + public void setHBaseListParms(String [] keyList, int versions, boolean useSalt, String prefixList ) { SourceConfig sc = new SourceConfig(); sc.sourceMode = SourceMode.GET_LIST; sc.tableName = tableName; sc.keyList = keyList; + sc.versions = (versions < 1) ? 1 : versions; + sc.useSalt = useSalt; + setPrefixList(sc, prefixList); sourceConfigList.add(sc); } - public void setHBaseScanAllParms() { + public void setHBaseScanAllParms(boolean useSalt, String prefixList) { SourceConfig sc = new SourceConfig(); sc.sourceMode = SourceMode.SCAN_ALL; sc.tableName = tableName; + sc.useSalt = useSalt; + + setPrefixList(sc, prefixList); sourceConfigList.add(sc); } + + public void setUseSaltInSink( boolean useSalt ) { + SinkConfig sc = new SinkConfig(); + + sc.tableName = tableName; + sc.useSalt = useSalt; + + sinkConfigList.add(sc); + } + + private void setPrefixList(SourceConfig sc, String prefixList ) { + prefixList = (prefixList == null || prefixList.length() == 0) ? HBaseSalter.DEFAULT_PREFIX_LIST : prefixList; + char[] prefixArray = prefixList.toCharArray(); + Arrays.sort(prefixArray); + sc.prefixList = new String( prefixArray ); + } } -- cgit v1.2.3