diff options
author | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-05-22 22:04:44 +0100 |
---|---|---|
committer | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-05-22 22:04:44 +0100 |
commit | 08e00dfce43244ec1edb0e2363a977fea985b454 (patch) | |
tree | 3ef8c08587edd2826c272c1277389bb7427aad66 | |
parent | ba87c17eed4e6f2d1e1b1d4644193485aae0bb4e (diff) | |
download | SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.tar.gz SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.zip |
Releasing 4.3 Spyglass for Scalding (0.9.1) and Scala (2.10)
20 files changed, 134 insertions, 210 deletions
@@ -11,51 +11,38 @@ <name>Cascading and Scalding wrapper for HBase with advanced features</name> <groupId>parallelai</groupId> <artifactId>parallelai.spyglass</artifactId> - <version>2.10.2_4.2rc2</version> + <version>2.10_0.9_4.3</version> <packaging>jar</packaging> <properties> + <!-- UTF-8 Encoding settings --> + <encoding>UTF-8</encoding> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <!-- Java compilation level --> <maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.target>1.6</maven.compiler.target> + + <!-- Maven --> <maven-compiler-plugin.version>3.0</maven-compiler-plugin.version> <maven-scala-plugin.version>2.15.2</maven-scala-plugin.version> - <maven-war-plugin.version>2.3</maven-war-plugin.version> - - <!-- UTF-8 Encoding settings --> - <encoding>UTF-8</encoding> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <maven.surefire.plugin.version>2.12.3</maven.surefire.plugin.version> <cdh.version>cdh4.5.0</cdh.version> - <datafu.version>0.0.4-${cdh.version}</datafu.version> - <flume.version>1.3.0-${cdh.version}</flume.version> <hadoop.version>2.0.0-${cdh.version}</hadoop.version> <hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version> <hbase.version>0.94.6-${cdh.version}</hbase.version> - <hive.version>0.10.0-${cdh.version}</hive.version> - <mahout.version>0.7-${cdh.version}</mahout.version> - <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version> - <oozie.version>3.3.0-${cdh.version}</oozie.version> - <oozie-hadoop.version>2.0.0-${cdh.version}.oozie-3.3.0-${cdh.version}</oozie-hadoop.version> - <oozie-sharelib.version>3.3.0-${cdh.version}</oozie-sharelib.version> - <pig.version>0.10.0-${cdh.version}</pig.version> - <sqoop.version>1.4.2-${cdh.version}</sqoop.version> - <whirr.version>0.8.0-${cdh.version}</whirr.version> - <zookeeper.version>3.4.5-${cdh.version}</zookeeper.version> <!-- Scala/Scalding/Cascading properties --> <!-- can be 2.9.3 and 2.10.2 --> - <scala.version>2.10.2</scala.version> + <scala.version>2.10.3</scala.version> <!-- 2.10 for Scala 2.10.2 and 2.9.3 for Scala version 2.9.3 --> <scalding.scala.version>2.10</scalding.scala.version> <scalding.version>0.9.1</scalding.version> - <cascading.version>2.5.3</cascading.version> - <scalding-commons.version>0.9.1</scalding-commons.version> <scalatest.version>1.9.1</scalatest.version> - <trove4j.version>3.0.3</trove4j.version> <!-- 2.1.1 for Scala 2.10.2 and 1.12.4.1 for Scala 2.9.3--> <specs2.version>2.1.1</specs2.version> <typesafe.config.version>1.0.0</typesafe.config.version> @@ -63,10 +50,6 @@ <!-- Other libraries properties --> <junit.version>4.10</junit.version> <slf4j.version>1.7.2</slf4j.version> - <trove4j.version>3.0.3</trove4j.version> - <javax.servlet.version>2.5</javax.servlet.version> - <uncommons-maths.version>1.2.2a</uncommons-maths.version> - <maven.surefire.plugin.version>2.12.3</maven.surefire.plugin.version> </properties> @@ -87,9 +70,14 @@ <name>Con Jars</name> <url>http://conjars.org/repo</url> </repository> + <repository> + <id>clojars</id> + <name>Clojars</name> + <url>http://conjars.org/repo</url> + </repository> </repositories> - <!-- Profiles --> + <!-- Profiles <profiles> <profile> <id>windows_profile</id> @@ -133,20 +121,14 @@ </dependency> </dependencies> </profile> - </profiles> + </profiles>--> <dependencies> - <!-- Cascading --> - <dependency> - <groupId>cascading</groupId> - <artifactId>cascading-core</artifactId> - <version>${cascading.version}</version> - </dependency> <dependency> - <groupId>cascading</groupId> - <artifactId>cascading-hadoop</artifactId> - <version>${cascading.version}</version> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> </dependency> <!-- Scalding --> @@ -174,6 +156,7 @@ <artifactId>hbase</artifactId> <version>${hbase.version}</version> </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -181,11 +164,6 @@ </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>${typesafe.config.version}</version> @@ -200,7 +178,7 @@ <groupId>com.twitter.elephantbird</groupId> <artifactId>elephant-bird-hadoop-compat</artifactId> <version>4.1</version> - </dependency> + </dependency> <!-- Testing dependencies (ScalaSpec / ScalaTest / JUnit) --> <dependency> diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java index af5bdf4..f49cca9 100644 --- a/src/main/java/parallelai/spyglass/base/JobLibLoader.java +++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java @@ -15,7 +15,6 @@ public class JobLibLoader { public static void loadJars(String libPathStr, Configuration config) { - try { Path libPath = new Path(libPathStr); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 77df84e..77996c6 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -6,11 +6,7 @@ import org.apache.commons.logging.LogFactory; import java.io.IOException; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 17:25 - * To change this template use File | Settings | File Templates. + * Utility class that sets the parameters of the record reader of a table split */ public class HBaseConfigUtils { static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5b5e9c3..7453d3e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -2,6 +2,9 @@ package parallelai.spyglass.hbase; import org.apache.hadoop.conf.Configuration; +/** + * Static enums , strings + */ public class HBaseConstants { public enum SourceMode { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 2f3047b..6fa7fce 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -18,11 +18,11 @@ import java.util.TreeSet; import java.util.UUID; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:43 - * To change this template use File | Settings | File Templates. + * For reading from HBase this class provides all the wiring + * + * The idea is before a Tap is created the parameters populate the JobConf + * In this class we pick up the parameters from the JobConf and set up our logic, + * i.e. which HBase columns to read, what is the start/stop key etc */ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -33,11 +33,11 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes protected HTable table; protected Filter rowFilter; - public static final String COLUMN_LIST = "hbase.tablecolumns"; - /** - * Use this jobconf param to specify the input table + * Use the following two mappings to specify table columns and input table to the jobconf + * Later on we can pick them up with job.get(COLUMN_LIST) */ + public static final String COLUMN_LIST = "hbase.tablecolumns"; protected static final String INPUT_TABLE = "hbase.inputtable"; protected String startKey = null; @@ -49,8 +49,6 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes protected boolean useSalt = false; protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - @Override public void configure(JobConf job) { String tableName = getTableName(job); @@ -65,7 +63,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes try { setHTable(new HTable(HBaseConfiguration.create(job), tableName)); } catch (Exception e) { - LOG.error("************* Table could not be created"); + LOG.error("************* HBase table " + tableName + " is not accessible"); LOG.error(StringUtils.stringifyException(e)); } @@ -166,7 +164,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes return job.get(INPUT_TABLE); } - protected void setParms(HBaseRecordReaderBase trr) { +// protected void setParms(HBaseRecordReaderBase trr) { +// } - } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 64effc9..332bbd7 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -3,52 +3,51 @@ package parallelai.spyglass.hbase; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.UUID; import javax.naming.NamingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * See HBaseInputFormatRegional first (!) + * + * Now that we know which splits we are interested reading from, we will proceed + * with iterating over the region servers & splits and depending on our Read strategy + * i.e. SCAN_RANGE, GET_LIST , SCAN_ALL we initiate <class>HBaseTableSplitGranular</class> per + * region and split with all the correct parameters. + * + * So all the different <u>strategies</u> are implemented here at a high level + * + */ public class HBaseInputFormatGranular extends HBaseInputFormatBase { private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); - // private String tableName = ""; - private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); private String nameServer = null; - // private Scan scan = null; - @SuppressWarnings("deprecation") @Override public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index 8185b22..5c98443 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -11,17 +11,20 @@ import java.util.Collection; import java.util.HashMap; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:24 - * To change this template use File | Settings | File Templates. + * An HBase table can be split across multiple regions + * + * Regional - is where we get the information + * 'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)' + * + * Granular on the other hand is when we go deep at a specific region + * + * Note: An HBase table can exist in multiple regions / region server as well */ public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); - @Override public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { granular.configure(job); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 3c62f82..401dea0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -14,7 +14,7 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** - * Convert Map/Reduce output and write it to an HBase table + * For writing Map/Reduce output into an HBase table */ public class HBaseOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { @@ -36,7 +36,6 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { ); } - @Override @SuppressWarnings("unchecked") public RecordWriter getRecordWriter(FileSystem ignored, @@ -61,7 +60,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { + throws IOException { String tableName = job.get(OUTPUT_TABLE); if(tableName == null) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..a88581b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -41,12 +41,25 @@ import cascading.tuple.TupleEntry; import cascading.util.Util; /** -* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -* with the {@HBaseRawTap} to allow for the reading and writing of data -* to and from a HBase cluster. -* -* @see HBaseRawTap -*/ + * It provides the wiring between Fields and Columns and Column families + * In effect to write to cf:column + * + * data:name data:surname address: street + * name1 surname1 address1 + * + * We will initialize the HBaseSource with + * ("data","data","data") + * ("name","surname","address") + * Data: + * ("name1","surname1","address1") + * ... + * + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + * + * @see HBaseRawTap + */ @SuppressWarnings({ "rawtypes", "deprecation" }) public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { /** @@ -67,13 +80,8 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto /** * Constructor HBaseScheme creates a new HBaseScheme instance. - * - * @param keyFields - * of type Fields * @param familyName * of type String - * @param valueFields - * of type Fields */ public HBaseRawScheme(String familyName) { this(new String[] { familyName }); @@ -235,6 +243,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto @Override public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { + DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, ValueCopier.class); if (null != familyNames) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 5dcd57d..efe548d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index cc64cb4..fa2aa7e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -8,17 +8,14 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.RecordReader; +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; import java.util.TreeSet; import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 15:42 - * To change this template use File | Settings | File Templates. + * Reading from HBase records logic & configuration */ public abstract class HBaseRecordReaderBase implements RecordReader<ImmutableBytesWritable, Result> { @@ -88,7 +85,6 @@ public abstract class HBaseRecordReaderBase implements } /** - * * @param endRow * the last row in the split */ diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index eace82c..b34ed3f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -20,6 +20,10 @@ import java.util.List; import java.util.Map; import java.util.Vector; +/** + * It builds the scanner in method init() and ALL the logic is in method next() + * At the reader level - our Job is parallelised in multiple Map phases ;-) + */ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); @@ -28,9 +32,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private ResultScanner scanner; private long timestamp = -1; private int rowcount = 0; + private final int scanCaching = 1000; - @Override - public String toString() { + @Override + public String toString() { StringBuffer sbuf = new StringBuffer(); sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", @@ -39,12 +44,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { sourceMode, useSalt, versions)); return sbuf.toString(); - } - - private final int scanCaching = 1000; - + } - /** + /** * Restart from survivable exceptions by creating a new scanner. * * @param firstRow diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index 5d2b613..cffedb2 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -1,34 +1,15 @@ package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 5bdf8cd..b20fd89 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -1,7 +1,6 @@ package parallelai.spyglass.hbase; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; @@ -13,6 +12,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +/** + * Use this Salter if you want maximum efficiency and speed when writing new data into HBase + * + * The 'Region Hot Spot' problem occurs when we are writing in an HBase table data, + * where the key is in sequential order. + * + * http://www.appfirst.com/blog/best-practices-for-managing-hbase-in-a-high-write-environment/ + * + * By using this salter we can prefix the keys with a character followed by an _ + * i.e. + * + * SCALDING -> G_SCALDING + * + * and thus distribute better the write load in the HBase cluster + */ public class HBaseSalter { private static final Log LOG = LogFactory.getLog(HBaseSalter.class); @@ -151,12 +165,10 @@ public class HBaseSalter { char[] prefixArray = prefixList.toCharArray(); return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], stopKey); -// return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); } public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, stopPrefix); -// return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); } private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index ba83a63..e037050 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; -//import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 3b72a1d..cdb6a77 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -12,11 +12,7 @@ import java.io.Serializable; import java.util.TreeSet; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 16:18 - * To change this template use File | Settings | File Templates. + * Split table logic */ public abstract class HBaseTableSplitBase implements InputSplit, Comparable<HBaseTableSplitBase>, Serializable { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index a266411..76edeba 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -1,19 +1,15 @@ package parallelai.spyglass.hbase; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * Split table logic + */ public class HBaseTableSplitGranular extends HBaseTableSplitBase { private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index ad5f78b..fa43226 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -3,17 +3,13 @@ package parallelai.spyglass.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Vector; -import org.apache.commons.lang.SerializationUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; public class HBaseTableSplitRegional extends HBaseTableSplitBase { diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml index 7fe6a12..6c62ba5 100644 --- a/src/main/resources/pom.xml +++ b/src/main/resources/pom.xml @@ -8,7 +8,7 @@ <description>Cascading and Scalding wrapper for HBase with advanced features</description> <groupId>parallelai</groupId> <artifactId>parallelai.spyglass</artifactId> - <version>2.10.2_4.2rc2</version> + <version>2.10_0.9_4.3</version> <packaging>jar</packaging> <organization> @@ -62,75 +62,44 @@ </repository> </repositories> - <properties> - <cdh.version>cdh4.5.0</cdh.version> - <datafu.version>0.0.4-${cdh.version}</datafu.version> - <flume.version>1.3.0-${cdh.version}</flume.version> - <hadoop.version>2.0.0-${cdh.version}</hadoop.version> - <hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version> - <hbase.version>0.94.6-${cdh.version}</hbase.version> - <hive.version>0.10.0-${cdh.version}</hive.version> - <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version> - <scala.version>2.10.2</scala.version> - <scalding.scala.version>2.10</scalding.scala.version> - <scalding.version>0.9.1</scalding.version> - <cascading.version>2.5.3</cascading.version> - <scalding-commons.version>0.9.1</scalding-commons.version> - <trove4j.version>3.0.3</trove4j.version> - <typesafe.config.version>1.0.0</typesafe.config.version> - <slf4j.version>1.7.2</slf4j.version> - </properties> - <dependencies> + + <!-- Scalding --> <dependency> - <groupId>cascading</groupId> - <artifactId>cascading-core</artifactId> - <version>${cascading.version}</version> - </dependency> - <dependency> - <groupId>cascading</groupId> - <artifactId>cascading-hadoop</artifactId> - <version>${cascading.version}</version> + <groupId>com.twitter</groupId>--> + <artifactId>scalding-core_2.10</artifactId> + <version>0.9.1</version> </dependency> + <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> - <version>${hadoop.core.version}</version> + <version>2.0.0-mr1-cdh4.5.0</version> </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> + <version>2.0.0-cdh4.5.0</version> </dependency> + <!-- HBase --> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-core_${scalding.scala.version}</artifactId> - <version>${scalding.version}</version> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>0.94.6-cdh4.5.0</version> </dependency> <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>${hbase.version}</version> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.2</version> </dependency> - - <!-- <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> --> + <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <version>${typesafe.config.version}</version> + <version>1.0.0</version> </dependency> <dependency> @@ -142,8 +111,8 @@ <groupId>com.twitter.elephantbird</groupId> <artifactId>elephant-bird-hadoop-compat</artifactId> <version>4.1</version> - </dependency> - </dependencies> + </dependency> + </dependencies> </project> diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 957258c..30c28c9 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -3,12 +3,8 @@ package parallelai.spyglass.hbase import java.io.IOException import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes -import com.twitter.scalding.AccessMode -import com.twitter.scalding.Hdfs -import com.twitter.scalding.Mode -import com.twitter.scalding.Read -import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} +import com.twitter.scalding._ +import parallelai.spyglass.hbase.HBaseConstants.SplitType import cascading.scheme.{NullScheme, Scheme} import cascading.tap.SinkMode import cascading.tap.Tap @@ -17,13 +13,11 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Source -import com.twitter.scalding.TestMode -import com.twitter.scalding.Test -import com.twitter.scalding.MemoryTap import java.io.InputStream import java.io.OutputStream import java.util.Properties +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Test object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -96,7 +90,7 @@ case class HBaseSource( val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) hbt.setUseSaltInSink(useSalt) - + hbt.asInstanceOf[Tap[_,_,_]] } } |