From 08e00dfce43244ec1edb0e2363a977fea985b454 Mon Sep 17 00:00:00 2001 From: Antonios Chalkiopoulos Date: Thu, 22 May 2014 22:04:44 +0100 Subject: Releasing 4.3 Spyglass for Scalding (0.9.1) and Scala (2.10) --- .../spyglass/hbase/HBaseConfigUtils.java | 6 +---- .../parallelai/spyglass/hbase/HBaseConstants.java | 3 +++ .../spyglass/hbase/HBaseInputFormatBase.java | 24 ++++++++--------- .../spyglass/hbase/HBaseInputFormatGranular.java | 23 ++++++++-------- .../spyglass/hbase/HBaseInputFormatRegional.java | 15 ++++++----- .../spyglass/hbase/HBaseOutputFormat.java | 5 ++-- .../parallelai/spyglass/hbase/HBaseRawScheme.java | 31 ++++++++++++++-------- .../parallelai/spyglass/hbase/HBaseRawTap.java | 1 - .../spyglass/hbase/HBaseRecordReaderBase.java | 8 ++---- .../spyglass/hbase/HBaseRecordReaderGranular.java | 16 ++++++----- .../spyglass/hbase/HBaseRecordReaderRegional.java | 19 ------------- .../parallelai/spyglass/hbase/HBaseSalter.java | 18 ++++++++++--- .../parallelai/spyglass/hbase/HBaseScheme.java | 1 - .../spyglass/hbase/HBaseTableSplitBase.java | 6 +---- .../spyglass/hbase/HBaseTableSplitGranular.java | 10 +++---- .../spyglass/hbase/HBaseTableSplitRegional.java | 4 --- 16 files changed, 87 insertions(+), 103 deletions(-) (limited to 'src/main/java/parallelai/spyglass/hbase') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 77df84e..77996c6 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -6,11 +6,7 @@ import org.apache.commons.logging.LogFactory; import java.io.IOException; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 17:25 - * To change this template use File | Settings | File Templates. + * Utility class that sets the parameters of the record reader of a table split */ public class HBaseConfigUtils { static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5b5e9c3..7453d3e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -2,6 +2,9 @@ package parallelai.spyglass.hbase; import org.apache.hadoop.conf.Configuration; +/** + * Static enums , strings + */ public class HBaseConstants { public enum SourceMode { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 2f3047b..6fa7fce 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -18,11 +18,11 @@ import java.util.TreeSet; import java.util.UUID; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:43 - * To change this template use File | Settings | File Templates. + * For reading from HBase this class provides all the wiring + * + * The idea is before a Tap is created the parameters populate the JobConf + * In this class we pick up the parameters from the JobConf and set up our logic, + * i.e. which HBase columns to read, what is the start/stop key etc */ public abstract class HBaseInputFormatBase implements InputFormat, JobConfigurable { @@ -33,11 +33,11 @@ public abstract class HBaseInputFormatBase implements InputFormatHBaseTableSplitGranular per + * region and split with all the correct parameters. + * + * So all the different strategies are implemented here at a high level + * + */ public class HBaseInputFormatGranular extends HBaseInputFormatBase { private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); - // private String tableName = ""; - private HashMap reverseDNSCacheMap = new HashMap(); private String nameServer = null; - // private Scan scan = null; - @SuppressWarnings("deprecation") @Override public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index 8185b22..5c98443 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -11,17 +11,20 @@ import java.util.Collection; import java.util.HashMap; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:24 - * To change this template use File | Settings | File Templates. + * An HBase table can be split across multiple regions + * + * Regional - is where we get the information + * 'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)' + * + * Granular on the other hand is when we go deep at a specific region + * + * Note: An HBase table can exist in multiple regions / region server as well */ public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); - @Override public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { granular.configure(job); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 3c62f82..401dea0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -14,7 +14,7 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** - * Convert Map/Reduce output and write it to an HBase table + * For writing Map/Reduce output into an HBase table */ public class HBaseOutputFormat extends FileOutputFormat implements JobConfigurable { @@ -36,7 +36,6 @@ FileOutputFormat implements JobConfigurable { ); } - @Override @SuppressWarnings("unchecked") public RecordWriter getRecordWriter(FileSystem ignored, @@ -61,7 +60,7 @@ FileOutputFormat implements JobConfigurable { @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { + throws IOException { String tableName = job.get(OUTPUT_TABLE); if(tableName == null) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..a88581b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -41,12 +41,25 @@ import cascading.tuple.TupleEntry; import cascading.util.Util; /** -* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -* with the {@HBaseRawTap} to allow for the reading and writing of data -* to and from a HBase cluster. -* -* @see HBaseRawTap -*/ + * It provides the wiring between Fields and Columns and Column families + * In effect to write to cf:column + * + * data:name data:surname address: street + * name1 surname1 address1 + * + * We will initialize the HBaseSource with + * ("data","data","data") + * ("name","surname","address") + * Data: + * ("name1","surname1","address1") + * ... + * + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + * + * @see HBaseRawTap + */ @SuppressWarnings({ "rawtypes", "deprecation" }) public class HBaseRawScheme extends Scheme { /** @@ -67,13 +80,8 @@ public class HBaseRawScheme extends Scheme process, Tap tap, JobConf conf) { + DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, ValueCopier.class); if (null != familyNames) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 5dcd57d..efe548d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index cc64cb4..fa2aa7e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -8,17 +8,14 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.RecordReader; +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; import java.util.TreeSet; import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 15:42 - * To change this template use File | Settings | File Templates. + * Reading from HBase records logic & configuration */ public abstract class HBaseRecordReaderBase implements RecordReader { @@ -88,7 +85,6 @@ public abstract class HBaseRecordReaderBase implements } /** - * * @param endRow * the last row in the split */ diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index eace82c..b34ed3f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -20,6 +20,10 @@ import java.util.List; import java.util.Map; import java.util.Vector; +/** + * It builds the scanner in method init() and ALL the logic is in method next() + * At the reader level - our Job is parallelised in multiple Map phases ;-) + */ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); @@ -28,9 +32,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private ResultScanner scanner; private long timestamp = -1; private int rowcount = 0; + private final int scanCaching = 1000; - @Override - public String toString() { + @Override + public String toString() { StringBuffer sbuf = new StringBuffer(); sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", @@ -39,12 +44,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { sourceMode, useSalt, versions)); return sbuf.toString(); - } - - private final int scanCaching = 1000; - + } - /** + /** * Restart from survivable exceptions by creating a new scanner. * * @param firstRow diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index 5d2b613..cffedb2 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -1,34 +1,15 @@ package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 5bdf8cd..b20fd89 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -1,7 +1,6 @@ package parallelai.spyglass.hbase; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; @@ -13,6 +12,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +/** + * Use this Salter if you want maximum efficiency and speed when writing new data into HBase + * + * The 'Region Hot Spot' problem occurs when we are writing in an HBase table data, + * where the key is in sequential order. + * + * http://www.appfirst.com/blog/best-practices-for-managing-hbase-in-a-high-write-environment/ + * + * By using this salter we can prefix the keys with a character followed by an _ + * i.e. + * + * SCALDING -> G_SCALDING + * + * and thus distribute better the write load in the HBase cluster + */ public class HBaseSalter { private static final Log LOG = LogFactory.getLog(HBaseSalter.class); @@ -151,12 +165,10 @@ public class HBaseSalter { char[] prefixArray = prefixList.toCharArray(); return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], stopKey); -// return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); } public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, stopPrefix); -// return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); } private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index ba83a63..e037050 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; -//import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 3b72a1d..cdb6a77 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -12,11 +12,7 @@ import java.io.Serializable; import java.util.TreeSet; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 16:18 - * To change this template use File | Settings | File Templates. + * Split table logic */ public abstract class HBaseTableSplitBase implements InputSplit, Comparable, Serializable { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index a266411..76edeba 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -1,19 +1,15 @@ package parallelai.spyglass.hbase; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * Split table logic + */ public class HBaseTableSplitGranular extends HBaseTableSplitBase { private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index ad5f78b..fa43226 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -3,17 +3,13 @@ package parallelai.spyglass.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Vector; -import org.apache.commons.lang.SerializationUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; public class HBaseTableSplitRegional extends HBaseTableSplitBase { -- cgit v1.2.3