aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntonios Chalkiopoulos <Antwnis@gmail.com>2014-05-22 22:04:44 +0100
committerAntonios Chalkiopoulos <Antwnis@gmail.com>2014-05-22 22:04:44 +0100
commit08e00dfce43244ec1edb0e2363a977fea985b454 (patch)
tree3ef8c08587edd2826c272c1277389bb7427aad66
parentba87c17eed4e6f2d1e1b1d4644193485aae0bb4e (diff)
downloadSpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.tar.gz
SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.zip
Releasing 4.3 Spyglass for Scalding (0.9.1) and Scala (2.10)
-rw-r--r--pom.xml66
-rw-r--r--src/main/java/parallelai/spyglass/base/JobLibLoader.java1
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java6
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseConstants.java3
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java24
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java23
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java15
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java5
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java31
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java1
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java8
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java16
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java19
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseSalter.java18
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java1
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java6
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java10
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java4
-rw-r--r--src/main/resources/pom.xml71
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala16
20 files changed, 134 insertions, 210 deletions
diff --git a/pom.xml b/pom.xml
index a431e04..8d1f2b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,51 +11,38 @@
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>2.10.2_4.2rc2</version>
+ <version>2.10_0.9_4.3</version>
<packaging>jar</packaging>
<properties>
+ <!-- UTF-8 Encoding settings -->
+ <encoding>UTF-8</encoding>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
<!-- Java compilation level -->
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
+
+ <!-- Maven -->
<maven-compiler-plugin.version>3.0</maven-compiler-plugin.version>
<maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>
- <maven-war-plugin.version>2.3</maven-war-plugin.version>
-
- <!-- UTF-8 Encoding settings -->
- <encoding>UTF-8</encoding>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <maven.surefire.plugin.version>2.12.3</maven.surefire.plugin.version>
<cdh.version>cdh4.5.0</cdh.version>
- <datafu.version>0.0.4-${cdh.version}</datafu.version>
- <flume.version>1.3.0-${cdh.version}</flume.version>
<hadoop.version>2.0.0-${cdh.version}</hadoop.version>
<hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version>
<hbase.version>0.94.6-${cdh.version}</hbase.version>
- <hive.version>0.10.0-${cdh.version}</hive.version>
- <mahout.version>0.7-${cdh.version}</mahout.version>
- <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version>
- <oozie.version>3.3.0-${cdh.version}</oozie.version>
- <oozie-hadoop.version>2.0.0-${cdh.version}.oozie-3.3.0-${cdh.version}</oozie-hadoop.version>
- <oozie-sharelib.version>3.3.0-${cdh.version}</oozie-sharelib.version>
- <pig.version>0.10.0-${cdh.version}</pig.version>
- <sqoop.version>1.4.2-${cdh.version}</sqoop.version>
- <whirr.version>0.8.0-${cdh.version}</whirr.version>
- <zookeeper.version>3.4.5-${cdh.version}</zookeeper.version>
<!-- Scala/Scalding/Cascading properties -->
<!-- can be 2.9.3 and 2.10.2 -->
- <scala.version>2.10.2</scala.version>
+ <scala.version>2.10.3</scala.version>
<!-- 2.10 for Scala 2.10.2 and 2.9.3 for Scala version 2.9.3 -->
<scalding.scala.version>2.10</scalding.scala.version>
<scalding.version>0.9.1</scalding.version>
- <cascading.version>2.5.3</cascading.version>
- <scalding-commons.version>0.9.1</scalding-commons.version>
<scalatest.version>1.9.1</scalatest.version>
- <trove4j.version>3.0.3</trove4j.version>
<!-- 2.1.1 for Scala 2.10.2 and 1.12.4.1 for Scala 2.9.3-->
<specs2.version>2.1.1</specs2.version>
<typesafe.config.version>1.0.0</typesafe.config.version>
@@ -63,10 +50,6 @@
<!-- Other libraries properties -->
<junit.version>4.10</junit.version>
<slf4j.version>1.7.2</slf4j.version>
- <trove4j.version>3.0.3</trove4j.version>
- <javax.servlet.version>2.5</javax.servlet.version>
- <uncommons-maths.version>1.2.2a</uncommons-maths.version>
- <maven.surefire.plugin.version>2.12.3</maven.surefire.plugin.version>
</properties>
@@ -87,9 +70,14 @@
<name>Con Jars</name>
<url>http://conjars.org/repo</url>
</repository>
+ <repository>
+ <id>clojars</id>
+ <name>Clojars</name>
+ <url>http://conjars.org/repo</url>
+ </repository>
</repositories>
- <!-- Profiles -->
+ <!-- Profiles
<profiles>
<profile>
<id>windows_profile</id>
@@ -133,20 +121,14 @@
</dependency>
</dependencies>
</profile>
- </profiles>
+ </profiles>-->
<dependencies>
- <!-- Cascading -->
- <dependency>
- <groupId>cascading</groupId>
- <artifactId>cascading-core</artifactId>
- <version>${cascading.version}</version>
- </dependency>
<dependency>
- <groupId>cascading</groupId>
- <artifactId>cascading-hadoop</artifactId>
- <version>${cascading.version}</version>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
</dependency>
<!-- Scalding -->
@@ -174,6 +156,7 @@
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
</dependency>
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -181,11 +164,6 @@
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${typesafe.config.version}</version>
@@ -200,7 +178,7 @@
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-hadoop-compat</artifactId>
<version>4.1</version>
- </dependency>
+ </dependency>
<!-- Testing dependencies (ScalaSpec / ScalaTest / JUnit) -->
<dependency>
diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java
index af5bdf4..f49cca9 100644
--- a/src/main/java/parallelai/spyglass/base/JobLibLoader.java
+++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java
@@ -15,7 +15,6 @@ public class JobLibLoader {
public static void loadJars(String libPathStr, Configuration config) {
-
try {
Path libPath = new Path(libPathStr);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
index 77df84e..77996c6 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java
@@ -6,11 +6,7 @@ import org.apache.commons.logging.LogFactory;
import java.io.IOException;
/**
- * Created with IntelliJ IDEA.
- * User: chand_000
- * Date: 29/08/13
- * Time: 17:25
- * To change this template use File | Settings | File Templates.
+ * Utility class that sets the parameters of the record reader of a table split
*/
public class HBaseConfigUtils {
static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
index 5b5e9c3..7453d3e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java
@@ -2,6 +2,9 @@ package parallelai.spyglass.hbase;
import org.apache.hadoop.conf.Configuration;
+/**
+ * Static enums , strings
+ */
public class HBaseConstants {
public enum SourceMode {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
index 2f3047b..6fa7fce 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
@@ -18,11 +18,11 @@ import java.util.TreeSet;
import java.util.UUID;
/**
- * Created with IntelliJ IDEA.
- * User: chand_000
- * Date: 29/08/13
- * Time: 12:43
- * To change this template use File | Settings | File Templates.
+ * For reading from HBase this class provides all the wiring
+ *
+ * The idea is before a Tap is created the parameters populate the JobConf
+ * In this class we pick up the parameters from the JobConf and set up our logic,
+ * i.e. which HBase columns to read, what is the start/stop key etc
*/
public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable {
@@ -33,11 +33,11 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes
protected HTable table;
protected Filter rowFilter;
- public static final String COLUMN_LIST = "hbase.tablecolumns";
-
/**
- * Use this jobconf param to specify the input table
+ * Use the following two mappings to specify table columns and input table to the jobconf
+ * Later on we can pick them up with job.get(COLUMN_LIST)
*/
+ public static final String COLUMN_LIST = "hbase.tablecolumns";
protected static final String INPUT_TABLE = "hbase.inputtable";
protected String startKey = null;
@@ -49,8 +49,6 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes
protected boolean useSalt = false;
protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST;
-
-
@Override
public void configure(JobConf job) {
String tableName = getTableName(job);
@@ -65,7 +63,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes
try {
setHTable(new HTable(HBaseConfiguration.create(job), tableName));
} catch (Exception e) {
- LOG.error("************* Table could not be created");
+ LOG.error("************* HBase table " + tableName + " is not accessible");
LOG.error(StringUtils.stringifyException(e));
}
@@ -166,7 +164,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes
return job.get(INPUT_TABLE);
}
- protected void setParms(HBaseRecordReaderBase trr) {
+// protected void setParms(HBaseRecordReaderBase trr) {
+// }
- }
}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 64effc9..332bbd7 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -3,52 +3,51 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.UUID;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+/**
+ * See HBaseInputFormatRegional first (!)
+ *
+ * Now that we know which splits we are interested reading from, we will proceed
+ * with iterating over the region servers & splits and depending on our Read strategy
+ * i.e. SCAN_RANGE, GET_LIST , SCAN_ALL we initiate <class>HBaseTableSplitGranular</class> per
+ * region and split with all the correct parameters.
+ *
+ * So all the different <u>strategies</u> are implemented here at a high level
+ *
+ */
public class HBaseInputFormatGranular extends HBaseInputFormatBase {
private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class);
- // private String tableName = "";
-
private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();
private String nameServer = null;
- // private Scan scan = null;
-
@SuppressWarnings("deprecation")
@Override
public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
index 8185b22..5c98443 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java
@@ -11,17 +11,20 @@ import java.util.Collection;
import java.util.HashMap;
/**
- * Created with IntelliJ IDEA.
- * User: chand_000
- * Date: 29/08/13
- * Time: 12:24
- * To change this template use File | Settings | File Templates.
+ * An HBase table can be split across multiple regions
+ *
+ * Regional - is where we get the information
+ * 'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)'
+ *
+ * Granular on the other hand is when we go deep at a specific region
+ *
+ * Note: An HBase table can exist in multiple regions / region server as well
*/
public class HBaseInputFormatRegional extends HBaseInputFormatBase {
+
private HBaseInputFormatGranular granular = new HBaseInputFormatGranular();
private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class);
-
@Override
public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException {
granular.configure(job);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
index 3c62f82..401dea0 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java
@@ -14,7 +14,7 @@ import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;
/**
- * Convert Map/Reduce output and write it to an HBase table
+ * For writing Map/Reduce output into an HBase table
*/
public class HBaseOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
@@ -36,7 +36,6 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
);
}
-
@Override
@SuppressWarnings("unchecked")
public RecordWriter getRecordWriter(FileSystem ignored,
@@ -61,7 +60,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
- throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+ throws IOException {
String tableName = job.get(OUTPUT_TABLE);
if(tableName == null) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
index 7b62c88..a88581b 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -41,12 +41,25 @@ import cascading.tuple.TupleEntry;
import cascading.util.Util;
/**
-* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
-* with the {@HBaseRawTap} to allow for the reading and writing of data
-* to and from a HBase cluster.
-*
-* @see HBaseRawTap
-*/
+ * It provides the wiring between Fields and Columns and Column families
+ * In effect to write to cf:column
+ *
+ * data:name data:surname address: street
+ * name1 surname1 address1
+ *
+ * We will initialize the HBaseSource with
+ * ("data","data","data")
+ * ("name","surname","address")
+ * Data:
+ * ("name1","surname1","address1")
+ * ...
+ *
+ * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
+ * with the {@HBaseRawTap} to allow for the reading and writing of data
+ * to and from a HBase cluster.
+ *
+ * @see HBaseRawTap
+ */
@SuppressWarnings({ "rawtypes", "deprecation" })
public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
/**
@@ -67,13 +80,8 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto
/**
* Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyFields
- * of type Fields
* @param familyName
* of type String
- * @param valueFields
- * of type Fields
*/
public HBaseRawScheme(String familyName) {
this(new String[] { familyName });
@@ -235,6 +243,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto
@Override
public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
JobConf conf) {
+
DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
ValueCopier.class);
if (null != familyNames) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
index 5dcd57d..efe548d 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
index cc64cb4..fa2aa7e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java
@@ -8,17 +8,14 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.RecordReader;
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
import java.util.TreeSet;
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
/**
- * Created with IntelliJ IDEA.
- * User: chand_000
- * Date: 29/08/13
- * Time: 15:42
- * To change this template use File | Settings | File Templates.
+ * Reading from HBase records logic & configuration
*/
public abstract class HBaseRecordReaderBase implements
RecordReader<ImmutableBytesWritable, Result> {
@@ -88,7 +85,6 @@ public abstract class HBaseRecordReaderBase implements
}
/**
- *
* @param endRow
* the last row in the split
*/
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index eace82c..b34ed3f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -20,6 +20,10 @@ import java.util.List;
import java.util.Map;
import java.util.Vector;
+/**
+ * It builds the scanner in method init() and ALL the logic is in method next()
+ * At the reader level - our Job is parallelised in multiple Map phases ;-)
+ */
public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class);
@@ -28,9 +32,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
private ResultScanner scanner;
private long timestamp = -1;
private int rowcount = 0;
+ private final int scanCaching = 1000;
- @Override
- public String toString() {
+ @Override
+ public String toString() {
StringBuffer sbuf = new StringBuffer();
sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]",
@@ -39,12 +44,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
sourceMode, useSalt, versions));
return sbuf.toString();
- }
-
- private final int scanCaching = 1000;
-
+ }
- /**
+ /**
* Restart from survivable exceptions by creating a new scanner.
*
* @param firstRow
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
index 5d2b613..cffedb2 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java
@@ -1,34 +1,15 @@
package parallelai.spyglass.hbase;
-import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
-
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeSet;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.StringUtils;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
public class HBaseRecordReaderRegional extends HBaseRecordReaderBase {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
index 5bdf8cd..b20fd89 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java
@@ -1,7 +1,6 @@
package parallelai.spyglass.hbase;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -13,6 +12,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+/**
+ * Use this Salter if you want maximum efficiency and speed when writing new data into HBase
+ *
+ * The 'Region Hot Spot' problem occurs when we are writing in an HBase table data,
+ * where the key is in sequential order.
+ *
+ * http://www.appfirst.com/blog/best-practices-for-managing-hbase-in-a-high-write-environment/
+ *
+ * By using this salter we can prefix the keys with a character followed by an _
+ * i.e.
+ *
+ * SCALDING -> G_SCALDING
+ *
+ * and thus distribute better the write load in the HBase cluster
+ */
public class HBaseSalter {
private static final Log LOG = LogFactory.getLog(HBaseSalter.class);
@@ -151,12 +165,10 @@ public class HBaseSalter {
char[] prefixArray = prefixList.toCharArray();
return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], stopKey);
-// return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1));
}
public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) {
return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, stopPrefix);
-// return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1));
}
private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index ba83a63..e037050 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
-//import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
index 3b72a1d..cdb6a77 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java
@@ -12,11 +12,7 @@ import java.io.Serializable;
import java.util.TreeSet;
/**
- * Created with IntelliJ IDEA.
- * User: chand_000
- * Date: 29/08/13
- * Time: 16:18
- * To change this template use File | Settings | File Templates.
+ * Split table logic
*/
public abstract class HBaseTableSplitBase implements InputSplit,
Comparable<HBaseTableSplitBase>, Serializable {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
index a266411..76edeba 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java
@@ -1,19 +1,15 @@
package parallelai.spyglass.hbase;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.InputSplit;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+/**
+ * Split table logic
+ */
public class HBaseTableSplitGranular extends HBaseTableSplitBase {
private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
index ad5f78b..fa43226 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java
@@ -3,17 +3,13 @@ package parallelai.spyglass.hbase;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.InputSplit;
public class HBaseTableSplitRegional extends HBaseTableSplitBase {
diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml
index 7fe6a12..6c62ba5 100644
--- a/src/main/resources/pom.xml
+++ b/src/main/resources/pom.xml
@@ -8,7 +8,7 @@
<description>Cascading and Scalding wrapper for HBase with advanced features</description>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>2.10.2_4.2rc2</version>
+ <version>2.10_0.9_4.3</version>
<packaging>jar</packaging>
<organization>
@@ -62,75 +62,44 @@
</repository>
</repositories>
- <properties>
- <cdh.version>cdh4.5.0</cdh.version>
- <datafu.version>0.0.4-${cdh.version}</datafu.version>
- <flume.version>1.3.0-${cdh.version}</flume.version>
- <hadoop.version>2.0.0-${cdh.version}</hadoop.version>
- <hadoop.core.version>2.0.0-mr1-${cdh.version}</hadoop.core.version>
- <hbase.version>0.94.6-${cdh.version}</hbase.version>
- <hive.version>0.10.0-${cdh.version}</hive.version>
- <mapreduce.version>2.0.0-mr1-${cdh.version}</mapreduce.version>
- <scala.version>2.10.2</scala.version>
- <scalding.scala.version>2.10</scalding.scala.version>
- <scalding.version>0.9.1</scalding.version>
- <cascading.version>2.5.3</cascading.version>
- <scalding-commons.version>0.9.1</scalding-commons.version>
- <trove4j.version>3.0.3</trove4j.version>
- <typesafe.config.version>1.0.0</typesafe.config.version>
- <slf4j.version>1.7.2</slf4j.version>
- </properties>
-
<dependencies>
+
+ <!-- Scalding -->
<dependency>
- <groupId>cascading</groupId>
- <artifactId>cascading-core</artifactId>
- <version>${cascading.version}</version>
- </dependency>
- <dependency>
- <groupId>cascading</groupId>
- <artifactId>cascading-hadoop</artifactId>
- <version>${cascading.version}</version>
+ <groupId>com.twitter</groupId>-->
+ <artifactId>scalding-core_2.10</artifactId>
+ <version>0.9.1</version>
</dependency>
+
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>${hadoop.core.version}</version>
+ <version>2.0.0-mr1-cdh4.5.0</version>
</dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
+ <version>2.0.0-cdh4.5.0</version>
</dependency>
+ <!-- HBase -->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>scalding-core_${scalding.scala.version}</artifactId>
- <version>${scalding.version}</version>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.94.6-cdh4.5.0</version>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.2</version>
</dependency>
-
- <!-- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency> -->
+
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <version>${typesafe.config.version}</version>
+ <version>1.0.0</version>
</dependency>
<dependency>
@@ -142,8 +111,8 @@
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-hadoop-compat</artifactId>
<version>4.1</version>
- </dependency>
- </dependencies>
+ </dependency>
+ </dependencies>
</project>
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 957258c..30c28c9 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -3,12 +3,8 @@ package parallelai.spyglass.hbase
import java.io.IOException
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
-import com.twitter.scalding.AccessMode
-import com.twitter.scalding.Hdfs
-import com.twitter.scalding.Mode
-import com.twitter.scalding.Read
-import com.twitter.scalding.Write
-import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
+import com.twitter.scalding._
+import parallelai.spyglass.hbase.HBaseConstants.SplitType
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
@@ -17,13 +13,11 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-import com.twitter.scalding.Source
-import com.twitter.scalding.TestMode
-import com.twitter.scalding.Test
-import com.twitter.scalding.MemoryTap
import java.io.InputStream
import java.io.OutputStream
import java.util.Properties
+import com.twitter.scalding.Hdfs
+import com.twitter.scalding.Test
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -96,7 +90,7 @@ case class HBaseSource(
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode)
hbt.setUseSaltInSink(useSalt)
-
+
hbt.asInstanceOf[Tap[_,_,_]]
}
}