From 08e00dfce43244ec1edb0e2363a977fea985b454 Mon Sep 17 00:00:00 2001 From: Antonios Chalkiopoulos Date: Thu, 22 May 2014 22:04:44 +0100 Subject: Releasing 4.3 Spyglass for Scalding (0.9.1) and Scala (2.10) --- .../parallelai/spyglass/base/JobLibLoader.java | 1 - .../spyglass/hbase/HBaseConfigUtils.java | 6 +- .../parallelai/spyglass/hbase/HBaseConstants.java | 3 + .../spyglass/hbase/HBaseInputFormatBase.java | 24 ++++---- .../spyglass/hbase/HBaseInputFormatGranular.java | 23 ++++--- .../spyglass/hbase/HBaseInputFormatRegional.java | 15 +++-- .../spyglass/hbase/HBaseOutputFormat.java | 5 +- .../parallelai/spyglass/hbase/HBaseRawScheme.java | 31 ++++++---- .../parallelai/spyglass/hbase/HBaseRawTap.java | 1 - .../spyglass/hbase/HBaseRecordReaderBase.java | 8 +-- .../spyglass/hbase/HBaseRecordReaderGranular.java | 16 ++--- .../spyglass/hbase/HBaseRecordReaderRegional.java | 19 ------ .../parallelai/spyglass/hbase/HBaseSalter.java | 18 +++++- .../parallelai/spyglass/hbase/HBaseScheme.java | 1 - .../spyglass/hbase/HBaseTableSplitBase.java | 6 +- .../spyglass/hbase/HBaseTableSplitGranular.java | 10 +-- .../spyglass/hbase/HBaseTableSplitRegional.java | 4 -- src/main/resources/pom.xml | 71 ++++++---------------- .../parallelai/spyglass/hbase/HBaseSource.scala | 16 ++--- 19 files changed, 112 insertions(+), 166 deletions(-) (limited to 'src') diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java index af5bdf4..f49cca9 100644 --- a/src/main/java/parallelai/spyglass/base/JobLibLoader.java +++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java @@ -15,7 +15,6 @@ public class JobLibLoader { public static void loadJars(String libPathStr, Configuration config) { - try { Path libPath = new Path(libPathStr); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 77df84e..77996c6 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -6,11 +6,7 @@ import org.apache.commons.logging.LogFactory; import java.io.IOException; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 17:25 - * To change this template use File | Settings | File Templates. + * Utility class that sets the parameters of the record reader of a table split */ public class HBaseConfigUtils { static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5b5e9c3..7453d3e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -2,6 +2,9 @@ package parallelai.spyglass.hbase; import org.apache.hadoop.conf.Configuration; +/** + * Static enums , strings + */ public class HBaseConstants { public enum SourceMode { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 2f3047b..6fa7fce 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -18,11 +18,11 @@ import java.util.TreeSet; import java.util.UUID; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:43 - * To change this template use File | Settings | File Templates. + * For reading from HBase this class provides all the wiring + * + * The idea is before a Tap is created the parameters populate the JobConf + * In this class we pick up the parameters from the JobConf and set up our logic, + * i.e. which HBase columns to read, what is the start/stop key etc */ public abstract class HBaseInputFormatBase implements InputFormat, JobConfigurable { @@ -33,11 +33,11 @@ public abstract class HBaseInputFormatBase implements InputFormatHBaseTableSplitGranular per + * region and split with all the correct parameters. + * + * So all the different strategies are implemented here at a high level + * + */ public class HBaseInputFormatGranular extends HBaseInputFormatBase { private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); - // private String tableName = ""; - private HashMap reverseDNSCacheMap = new HashMap(); private String nameServer = null; - // private Scan scan = null; - @SuppressWarnings("deprecation") @Override public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index 8185b22..5c98443 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -11,17 +11,20 @@ import java.util.Collection; import java.util.HashMap; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:24 - * To change this template use File | Settings | File Templates. + * An HBase table can be split across multiple regions + * + * Regional - is where we get the information + * 'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)' + * + * Granular on the other hand is when we go deep at a specific region + * + * Note: An HBase table can exist in multiple regions / region server as well */ public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); - @Override public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { granular.configure(job); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 3c62f82..401dea0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -14,7 +14,7 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** - * Convert Map/Reduce output and write it to an HBase table + * For writing Map/Reduce output into an HBase table */ public class HBaseOutputFormat extends FileOutputFormat implements JobConfigurable { @@ -36,7 +36,6 @@ FileOutputFormat implements JobConfigurable { ); } - @Override @SuppressWarnings("unchecked") public RecordWriter getRecordWriter(FileSystem ignored, @@ -61,7 +60,7 @@ FileOutputFormat implements JobConfigurable { @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { + throws IOException { String tableName = job.get(OUTPUT_TABLE); if(tableName == null) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..a88581b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -41,12 +41,25 @@ import cascading.tuple.TupleEntry; import cascading.util.Util; /** -* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -* with the {@HBaseRawTap} to allow for the reading and writing of data -* to and from a HBase cluster. -* -* @see HBaseRawTap -*/ + * It provides the wiring between Fields and Columns and Column families + * In effect to write to cf:column + * + * data:name data:surname address: street + * name1 surname1 address1 + * + * We will initialize the HBaseSource with + * ("data","data","data") + * ("name","surname","address") + * Data: + * ("name1","surname1","address1") + * ... + * + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + * + * @see HBaseRawTap + */ @SuppressWarnings({ "rawtypes", "deprecation" }) public class HBaseRawScheme extends Scheme { /** @@ -67,13 +80,8 @@ public class HBaseRawScheme extends Scheme process, Tap tap, JobConf conf) { + DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, ValueCopier.class); if (null != familyNames) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 5dcd57d..efe548d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index cc64cb4..fa2aa7e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -8,17 +8,14 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.RecordReader; +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; import java.util.TreeSet; import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 15:42 - * To change this template use File | Settings | File Templates. + * Reading from HBase records logic & configuration */ public abstract class HBaseRecordReaderBase implements RecordReader { @@ -88,7 +85,6 @@ public abstract class HBaseRecordReaderBase implements } /** - * * @param endRow * the last row in the split */ diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index eace82c..b34ed3f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -20,6 +20,10 @@ import java.util.List; import java.util.Map; import java.util.Vector; +/** + * It builds the scanner in method init() and ALL the logic is in method next() + * At the reader level - our Job is parallelised in multiple Map phases ;-) + */ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); @@ -28,9 +32,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private ResultScanner scanner; private long timestamp = -1; private int rowcount = 0; + private final int scanCaching = 1000; - @Override - public String toString() { + @Override + public String toString() { StringBuffer sbuf = new StringBuffer(); sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", @@ -39,12 +44,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { sourceMode, useSalt, versions)); return sbuf.toString(); - } - - private final int scanCaching = 1000; - + } - /** + /** * Restart from survivable exceptions by creating a new scanner. * * @param firstRow diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index 5d2b613..cffedb2 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -1,34 +1,15 @@ package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 5bdf8cd..b20fd89 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -1,7 +1,6 @@ package parallelai.spyglass.hbase; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; @@ -13,6 +12,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +/** + * Use this Salter if you want maximum efficiency and speed when writing new data into HBase + * + * The 'Region Hot Spot' problem occurs when we are writing in an HBase table data, + * where the key is in sequential order. + * + * http://www.appfirst.com/blog/best-practices-for-managing-hbase-in-a-high-write-environment/ + * + * By using this salter we can prefix the keys with a character followed by an _ + * i.e. + * + * SCALDING -> G_SCALDING + * + * and thus distribute better the write load in the HBase cluster + */ public class HBaseSalter { private static final Log LOG = LogFactory.getLog(HBaseSalter.class); @@ -151,12 +165,10 @@ public class HBaseSalter { char[] prefixArray = prefixList.toCharArray(); return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], stopKey); -// return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); } public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, stopPrefix); -// return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); } private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index ba83a63..e037050 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; -//import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 3b72a1d..cdb6a77 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -12,11 +12,7 @@ import java.io.Serializable; import java.util.TreeSet; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 16:18 - * To change this template use File | Settings | File Templates. + * Split table logic */ public abstract class HBaseTableSplitBase implements InputSplit, Comparable, Serializable { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index a266411..76edeba 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -1,19 +1,15 @@ package parallelai.spyglass.hbase; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * Split table logic + */ public class HBaseTableSplitGranular extends HBaseTableSplitBase { private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index ad5f78b..fa43226 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -3,17 +3,13 @@ package parallelai.spyglass.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Vector; -import org.apache.commons.lang.SerializationUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; public class HBaseTableSplitRegional extends HBaseTableSplitBase { diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml index 7fe6a12..6c62ba5 100644 --- a/src/main/resources/pom.xml +++ b/src/main/resources/pom.xml @@ -8,7 +8,7 @@ Cascading and Scalding wrapper for HBase with advanced features parallelai parallelai.spyglass - 2.10.2_4.2rc2 + 2.10_0.9_4.3 jar @@ -62,75 +62,44 @@ - - cdh4.5.0 - 0.0.4-${cdh.version} - 1.3.0-${cdh.version} - 2.0.0-${cdh.version} - 2.0.0-mr1-${cdh.version} - 0.94.6-${cdh.version} - 0.10.0-${cdh.version} - 2.0.0-mr1-${cdh.version} - 2.10.2 - 2.10 - 0.9.1 - 2.5.3 - 0.9.1 - 3.0.3 - 1.0.0 - 1.7.2 - - + + - cascading - cascading-core - ${cascading.version} - - - cascading - cascading-hadoop - ${cascading.version} + com.twitter--> + scalding-core_2.10 + 0.9.1 + org.apache.hadoop hadoop-core - ${hadoop.core.version} + 2.0.0-mr1-cdh4.5.0 - org.apache.hadoop hadoop-common - ${hadoop.version} + 2.0.0-cdh4.5.0 + - org.slf4j - slf4j-api - ${slf4j.version} - - - com.twitter - scalding-core_${scalding.scala.version} - ${scalding.version} + org.apache.hbase + hbase + 0.94.6-cdh4.5.0 - org.apache.hbase - hbase - ${hbase.version} + org.slf4j + slf4j-api + 1.7.2 - - + com.typesafe config - ${typesafe.config.version} + 1.0.0 @@ -142,8 +111,8 @@ com.twitter.elephantbird elephant-bird-hadoop-compat 4.1 - - + + diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 957258c..30c28c9 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -3,12 +3,8 @@ package parallelai.spyglass.hbase import java.io.IOException import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes -import com.twitter.scalding.AccessMode -import com.twitter.scalding.Hdfs -import com.twitter.scalding.Mode -import com.twitter.scalding.Read -import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} +import com.twitter.scalding._ +import parallelai.spyglass.hbase.HBaseConstants.SplitType import cascading.scheme.{NullScheme, Scheme} import cascading.tap.SinkMode import cascading.tap.Tap @@ -17,13 +13,11 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Source -import com.twitter.scalding.TestMode -import com.twitter.scalding.Test -import com.twitter.scalding.MemoryTap import java.io.InputStream import java.io.OutputStream import java.util.Properties +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Test object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -96,7 +90,7 @@ case class HBaseSource( val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, sinkMode) hbt.setUseSaltInSink(useSalt) - + hbt.asInstanceOf[Tap[_,_,_]] } } -- cgit v1.2.3