diff options
author | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-05-22 22:04:44 +0100 |
---|---|---|
committer | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-05-22 22:04:44 +0100 |
commit | 08e00dfce43244ec1edb0e2363a977fea985b454 (patch) | |
tree | 3ef8c08587edd2826c272c1277389bb7427aad66 /src/main/java | |
parent | ba87c17eed4e6f2d1e1b1d4644193485aae0bb4e (diff) | |
download | SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.tar.gz SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.zip |
Releasing 4.3 Spyglass for Scalding (0.9.1) and Scala (2.10)
Diffstat (limited to 'src/main/java')
17 files changed, 87 insertions, 104 deletions
diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java index af5bdf4..f49cca9 100644 --- a/src/main/java/parallelai/spyglass/base/JobLibLoader.java +++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java @@ -15,7 +15,6 @@ public class JobLibLoader { public static void loadJars(String libPathStr, Configuration config) { - try { Path libPath = new Path(libPathStr); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 77df84e..77996c6 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -6,11 +6,7 @@ import org.apache.commons.logging.LogFactory; import java.io.IOException; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 17:25 - * To change this template use File | Settings | File Templates. + * Utility class that sets the parameters of the record reader of a table split */ public class HBaseConfigUtils { static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5b5e9c3..7453d3e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -2,6 +2,9 @@ package parallelai.spyglass.hbase; import org.apache.hadoop.conf.Configuration; +/** + * Static enums , strings + */ public class HBaseConstants { public enum SourceMode { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 2f3047b..6fa7fce 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -18,11 +18,11 @@ import java.util.TreeSet; import java.util.UUID; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:43 - * To change this template use File | Settings | File Templates. + * For reading from HBase this class provides all the wiring + * + * The idea is before a Tap is created the parameters populate the JobConf + * In this class we pick up the parameters from the JobConf and set up our logic, + * i.e. which HBase columns to read, what is the start/stop key etc */ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -33,11 +33,11 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes protected HTable table; protected Filter rowFilter; - public static final String COLUMN_LIST = "hbase.tablecolumns"; - /** - * Use this jobconf param to specify the input table + * Use the following two mappings to specify table columns and input table to the jobconf + * Later on we can pick them up with job.get(COLUMN_LIST) */ + public static final String COLUMN_LIST = "hbase.tablecolumns"; protected static final String INPUT_TABLE = "hbase.inputtable"; protected String startKey = null; @@ -49,8 +49,6 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes protected boolean useSalt = false; protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - - @Override public void configure(JobConf job) { String tableName = getTableName(job); @@ -65,7 +63,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes try { setHTable(new HTable(HBaseConfiguration.create(job), tableName)); } catch (Exception e) { - LOG.error("************* Table could not be created"); + LOG.error("************* HBase table " + tableName + " is not accessible"); LOG.error(StringUtils.stringifyException(e)); } @@ -166,7 +164,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes return job.get(INPUT_TABLE); } - protected void setParms(HBaseRecordReaderBase trr) { +// protected void setParms(HBaseRecordReaderBase trr) { +// } - } } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 64effc9..332bbd7 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -3,52 +3,51 @@ package parallelai.spyglass.hbase; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.UUID; import javax.naming.NamingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * See HBaseInputFormatRegional first (!) + * + * Now that we know which splits we are interested reading from, we will proceed + * with iterating over the region servers & splits and depending on our Read strategy + * i.e. SCAN_RANGE, GET_LIST , SCAN_ALL we initiate <class>HBaseTableSplitGranular</class> per + * region and split with all the correct parameters. + * + * So all the different <u>strategies</u> are implemented here at a high level + * + */ public class HBaseInputFormatGranular extends HBaseInputFormatBase { private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); - // private String tableName = ""; - private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); private String nameServer = null; - // private Scan scan = null; - @SuppressWarnings("deprecation") @Override public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index 8185b22..5c98443 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -11,17 +11,20 @@ import java.util.Collection; import java.util.HashMap; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:24 - * To change this template use File | Settings | File Templates. + * An HBase table can be split across multiple regions + * + * Regional - is where we get the information + * 'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)' + * + * Granular on the other hand is when we go deep at a specific region + * + * Note: An HBase table can exist in multiple regions / region server as well */ public class HBaseInputFormatRegional extends HBaseInputFormatBase { + private HBaseInputFormatGranular granular = new HBaseInputFormatGranular(); private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); - @Override public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException { granular.configure(job); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 3c62f82..401dea0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -14,7 +14,7 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; /** - * Convert Map/Reduce output and write it to an HBase table + * For writing Map/Reduce output into an HBase table */ public class HBaseOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { @@ -36,7 +36,6 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { ); } - @Override @SuppressWarnings("unchecked") public RecordWriter getRecordWriter(FileSystem ignored, @@ -61,7 +60,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { + throws IOException { String tableName = job.get(OUTPUT_TABLE); if(tableName == null) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..a88581b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -41,12 +41,25 @@ import cascading.tuple.TupleEntry; import cascading.util.Util; /** -* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -* with the {@HBaseRawTap} to allow for the reading and writing of data -* to and from a HBase cluster. -* -* @see HBaseRawTap -*/ + * It provides the wiring between Fields and Columns and Column families + * In effect to write to cf:column + * + * data:name data:surname address: street + * name1 surname1 address1 + * + * We will initialize the HBaseSource with + * ("data","data","data") + * ("name","surname","address") + * Data: + * ("name1","surname1","address1") + * ... + * + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + * + * @see HBaseRawTap + */ @SuppressWarnings({ "rawtypes", "deprecation" }) public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { /** @@ -67,13 +80,8 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto /** * Constructor HBaseScheme creates a new HBaseScheme instance. - * - * @param keyFields - * of type Fields * @param familyName * of type String - * @param valueFields - * of type Fields */ public HBaseRawScheme(String familyName) { this(new String[] { familyName }); @@ -235,6 +243,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto @Override public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { + DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, ValueCopier.class); if (null != familyNames) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 5dcd57d..efe548d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index cc64cb4..fa2aa7e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -8,17 +8,14 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.RecordReader; +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; import java.util.TreeSet; import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 15:42 - * To change this template use File | Settings | File Templates. + * Reading from HBase records logic & configuration */ public abstract class HBaseRecordReaderBase implements RecordReader<ImmutableBytesWritable, Result> { @@ -88,7 +85,6 @@ public abstract class HBaseRecordReaderBase implements } /** - * * @param endRow * the last row in the split */ diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index eace82c..b34ed3f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -20,6 +20,10 @@ import java.util.List; import java.util.Map; import java.util.Vector; +/** + * It builds the scanner in method init() and ALL the logic is in method next() + * At the reader level - our Job is parallelised in multiple Map phases ;-) + */ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); @@ -28,9 +32,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { private ResultScanner scanner; private long timestamp = -1; private int rowcount = 0; + private final int scanCaching = 1000; - @Override - public String toString() { + @Override + public String toString() { StringBuffer sbuf = new StringBuffer(); sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", @@ -39,12 +44,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase { sourceMode, useSalt, versions)); return sbuf.toString(); - } - - private final int scanCaching = 1000; - + } - /** + /** * Restart from survivable exceptions by creating a new scanner. * * @param firstRow diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index 5d2b613..cffedb2 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -1,34 +1,15 @@ package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 5bdf8cd..b20fd89 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -1,7 +1,6 @@ package parallelai.spyglass.hbase; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; @@ -13,6 +12,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +/** + * Use this Salter if you want maximum efficiency and speed when writing new data into HBase + * + * The 'Region Hot Spot' problem occurs when we are writing in an HBase table data, + * where the key is in sequential order. + * + * http://www.appfirst.com/blog/best-practices-for-managing-hbase-in-a-high-write-environment/ + * + * By using this salter we can prefix the keys with a character followed by an _ + * i.e. + * + * SCALDING -> G_SCALDING + * + * and thus distribute better the write load in the HBase cluster + */ public class HBaseSalter { private static final Log LOG = LogFactory.getLog(HBaseSalter.class); @@ -151,12 +165,10 @@ public class HBaseSalter { char[] prefixArray = prefixList.toCharArray(); return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], stopKey); -// return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1)); } public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, stopPrefix); -// return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1)); } private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index ba83a63..e037050 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; -//import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 3b72a1d..cdb6a77 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -12,11 +12,7 @@ import java.io.Serializable; import java.util.TreeSet; /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 16:18 - * To change this template use File | Settings | File Templates. + * Split table logic */ public abstract class HBaseTableSplitBase implements InputSplit, Comparable<HBaseTableSplitBase>, Serializable { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index a266411..76edeba 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -1,19 +1,15 @@ package parallelai.spyglass.hbase; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * Split table logic + */ public class HBaseTableSplitGranular extends HBaseTableSplitBase { private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index ad5f78b..fa43226 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -3,17 +3,13 @@ package parallelai.spyglass.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Vector; -import org.apache.commons.lang.SerializationUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; public class HBaseTableSplitRegional extends HBaseTableSplitBase { |