diff options
| author | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-05-22 22:04:44 +0100 | 
|---|---|---|
| committer | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-05-22 22:04:44 +0100 | 
| commit | 08e00dfce43244ec1edb0e2363a977fea985b454 (patch) | |
| tree | 3ef8c08587edd2826c272c1277389bb7427aad66 /src/main/java/parallelai/spyglass/hbase | |
| parent | ba87c17eed4e6f2d1e1b1d4644193485aae0bb4e (diff) | |
| download | SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.tar.gz SpyGlass-08e00dfce43244ec1edb0e2363a977fea985b454.zip | |
Releasing 4.3 Spyglass for Scalding (0.9.1) and Scala (2.10)
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase')
16 files changed, 87 insertions, 103 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java index 77df84e..77996c6 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConfigUtils.java @@ -6,11 +6,7 @@ import org.apache.commons.logging.LogFactory;  import java.io.IOException;  /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 17:25 - * To change this template use File | Settings | File Templates. + * Utility class that sets the parameters of the record reader of a table split   */  public class HBaseConfigUtils {      static final Log LOG = LogFactory.getLog(HBaseConfigUtils.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java index 5b5e9c3..7453d3e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -2,6 +2,9 @@ package parallelai.spyglass.hbase;  import org.apache.hadoop.conf.Configuration; +/** + * Static enums , strings + */  public class HBaseConstants {      public enum SourceMode { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 2f3047b..6fa7fce 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -18,11 +18,11 @@ import java.util.TreeSet;  import java.util.UUID;  /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:43 - * To change this template use File | Settings | File Templates. + * For reading from HBase this class provides all the wiring + * + * The idea is before a Tap is created the parameters populate the JobConf + * In this class we pick up the parameters from the JobConf and set up our logic, + * i.e. which HBase columns to read, what is the start/stop key etc   */  public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { @@ -33,11 +33,11 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes      protected HTable table;      protected Filter rowFilter; -    public static final String COLUMN_LIST = "hbase.tablecolumns"; -      /** -     * Use this jobconf param to specify the input table +     * Use the following two mappings to specify table columns and input table to the jobconf +     * Later on we can pick them up with job.get(COLUMN_LIST)       */ +    public static final String COLUMN_LIST = "hbase.tablecolumns";      protected static final String INPUT_TABLE = "hbase.inputtable";      protected String startKey = null; @@ -49,8 +49,6 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes      protected boolean useSalt = false;      protected String prefixList = HBaseSalter.DEFAULT_PREFIX_LIST; - -      @Override      public void configure(JobConf job) {          String tableName = getTableName(job); @@ -65,7 +63,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes          try {              setHTable(new HTable(HBaseConfiguration.create(job), tableName));          } catch (Exception e) { -            LOG.error("************* Table could not be created"); +            LOG.error("************* HBase table " + tableName + " is not accessible");              LOG.error(StringUtils.stringifyException(e));          } @@ -166,7 +164,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes          return job.get(INPUT_TABLE);      } -    protected void setParms(HBaseRecordReaderBase trr) { +//    protected void setParms(HBaseRecordReaderBase trr) { +//    } -    }  } diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 64effc9..332bbd7 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -3,52 +3,51 @@ package parallelai.spyglass.hbase;  import java.io.IOException;  import java.net.InetAddress;  import java.util.ArrayList; -import java.util.Collection;  import java.util.HashMap;  import java.util.List;  import java.util.Set;  import java.util.TreeSet; -import java.util.UUID;  import javax.naming.NamingException;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.HRegionLocation;  import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.util.Addressing;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.hbase.util.Pair;  import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.mapred.InputFormat;  import org.apache.hadoop.mapred.InputSplit;  import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable;  import org.apache.hadoop.mapred.RecordReader;  import org.apache.hadoop.mapred.Reporter;  import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils;  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * See HBaseInputFormatRegional first (!) + * + * Now that we know which splits we are interested reading from, we will proceed + * with iterating over the region servers & splits and depending on our Read strategy + * i.e. SCAN_RANGE, GET_LIST , SCAN_ALL we initiate <class>HBaseTableSplitGranular</class> per + * region and split with all the correct parameters. + * + * So all the different <u>strategies</u> are implemented here at a high level + * + */  public class HBaseInputFormatGranular extends HBaseInputFormatBase {  	private final Log LOG = LogFactory.getLog(HBaseInputFormatGranular.class); -	// private String tableName = ""; -  	private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>();  	private String nameServer = null; -	// private Scan scan = null; -  	@SuppressWarnings("deprecation")  	@Override  	public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java index 8185b22..5c98443 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatRegional.java @@ -11,17 +11,20 @@ import java.util.Collection;  import java.util.HashMap;  /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 12:24 - * To change this template use File | Settings | File Templates. + * An HBase table can be split across multiple regions + * + * Regional - is where we get the information + *   'Hey this table exists in a Region at Location (10.139.8.10) and another one at (10.139.8.11)' + * + * Granular on the other hand is when we go deep at a specific region + * + * Note: An HBase table can exist in multiple regions / region server as well   */  public class HBaseInputFormatRegional extends HBaseInputFormatBase { +      private HBaseInputFormatGranular granular = new HBaseInputFormatGranular();      private final Log LOG = LogFactory.getLog(HBaseInputFormatRegional.class); -      @Override      public HBaseTableSplitRegional[] getSplits(JobConf job, int numSplits) throws IOException {          granular.configure(job); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java index 3c62f82..401dea0 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseOutputFormat.java @@ -14,7 +14,7 @@ import org.apache.hadoop.mapred.*;  import org.apache.hadoop.util.Progressable;  /** - * Convert Map/Reduce output and write it to an HBase table + * For writing Map/Reduce output into an HBase table   */  public class HBaseOutputFormat extends  FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable { @@ -36,7 +36,6 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {        );    } -    @Override    @SuppressWarnings("unchecked")    public RecordWriter getRecordWriter(FileSystem ignored, @@ -61,7 +60,7 @@ FileOutputFormat<ImmutableBytesWritable, Put> implements JobConfigurable {    @Override    public void checkOutputSpecs(FileSystem ignored, JobConf job) -  throws FileAlreadyExistsException, InvalidJobConfException, IOException { +  throws IOException {      String tableName = job.get(OUTPUT_TABLE);      if(tableName == null) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7b62c88..a88581b 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -41,12 +41,25 @@ import cascading.tuple.TupleEntry;  import cascading.util.Util;  /** -* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -* with the {@HBaseRawTap} to allow for the reading and writing of data -* to and from a HBase cluster. -* -* @see HBaseRawTap -*/ + * It provides the wiring between Fields and Columns and Column families + * In effect to write to cf:column + * + * data:name data:surname address: street + *  name1      surname1       address1 + * + * We will initialize the HBaseSource with + *   ("data","data","data") + *   ("name","surname","address") + *   Data: + *   ("name1","surname1","address1") + *   ... + * + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + * + * @see HBaseRawTap + */  @SuppressWarnings({ "rawtypes", "deprecation" })  public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {  	/** @@ -67,13 +80,8 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto  	/**  	 * Constructor HBaseScheme creates a new HBaseScheme instance. -	 * -	 * @param keyFields -	 *            of type Fields  	 * @param familyName  	 *            of type String -	 * @param valueFields -	 *            of type Fields  	 */  	public HBaseRawScheme(String familyName) {  		this(new String[] { familyName }); @@ -235,6 +243,7 @@ public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollecto  	@Override  	public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,  			JobConf conf) { +  		DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,  				ValueCopier.class);  		if (null != familyNames) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 5dcd57d..efe548d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;  import org.apache.hadoop.hbase.MasterNotRunningException;  import org.apache.hadoop.hbase.ZooKeeperConnectionException;  import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan;  import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  import org.apache.hadoop.mapred.FileInputFormat;  import org.apache.hadoop.mapred.JobConf; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java index cc64cb4..fa2aa7e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderBase.java @@ -8,17 +8,14 @@ import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.RecordReader; +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;  import java.util.TreeSet;  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;  /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 15:42 - * To change this template use File | Settings | File Templates. + * Reading from HBase records logic & configuration   */  public abstract class HBaseRecordReaderBase implements          RecordReader<ImmutableBytesWritable, Result> { @@ -88,7 +85,6 @@ public abstract class HBaseRecordReaderBase implements      }      /** -     *       * @param endRow       *          the last row in the split       */ diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java index eace82c..b34ed3f 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java @@ -20,6 +20,10 @@ import java.util.List;  import java.util.Map;  import java.util.Vector; +/** + * It builds the scanner in method init() and ALL the logic is in method next() + * At the reader level - our Job is parallelised in multiple Map phases ;-) + */  public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {    static final Log LOG = LogFactory.getLog(HBaseRecordReaderGranular.class); @@ -28,9 +32,10 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {    private ResultScanner scanner;    private long timestamp = -1;    private int rowcount = 0; +  private final int scanCaching = 1000; -    @Override -    public String toString() { +  @Override +  public String toString() {          StringBuffer sbuf = new StringBuffer();          sbuf.append("".format("HBaseRecordReaderRegional : startRow [%s] endRow [%s] lastRow [%s] nextKey [%s] endRowInc [%s] rowCount [%s]", @@ -39,12 +44,9 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {                  sourceMode, useSalt, versions));          return sbuf.toString(); -    } - -    private final int scanCaching = 1000; - +  } -    /** +  /**     * Restart from survivable exceptions by creating a new scanner.     *      * @param firstRow diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java index 5d2b613..cffedb2 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderRegional.java @@ -1,34 +1,15 @@  package parallelai.spyglass.hbase; -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; -  import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap;  import java.util.List;  import java.util.Map; -import java.util.TreeSet;  import java.util.Vector;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable;  import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.filter.Filter;  import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.StringUtils; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  public class HBaseRecordReaderRegional extends HBaseRecordReaderBase { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java index 5bdf8cd..b20fd89 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseSalter.java @@ -1,7 +1,6 @@  package parallelai.spyglass.hbase;  import java.io.IOException; -import java.util.ArrayList;  import java.util.Arrays;  import java.util.SortedSet;  import java.util.TreeSet; @@ -13,6 +12,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.hbase.util.Pair; +/** + * Use this Salter if you want maximum efficiency and speed when writing new data into HBase + * + * The 'Region Hot Spot' problem occurs when we are writing in an HBase table data, + * where the key is in sequential order. + * + * http://www.appfirst.com/blog/best-practices-for-managing-hbase-in-a-high-write-environment/ + * + * By using this salter we can prefix the keys with a character followed by an _ + * i.e. + * + * SCALDING -> G_SCALDING + * + * and thus distribute better the write load in the HBase cluster + */  public class HBaseSalter {    private static final Log LOG = LogFactory.getLog(HBaseSalter.class); @@ -151,12 +165,10 @@ public class HBaseSalter {  	  char[] prefixArray = prefixList.toCharArray();        return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], stopKey); -//	  return getAllKeysWithStartStop(originalKey, prefixList, (byte)prefixArray[0], (byte)(stopKey - 1));    }    public static byte[][] getAllKeysInRange(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) {        return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, stopPrefix); -//	  return getAllKeysWithStartStop(originalKey, prefixList, startPrefix, (byte)(stopPrefix - 1));    }    private static byte[][] getAllKeysWithStartStop(byte[] originalKey, String prefixList, byte startPrefix, byte stopPrefix) { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java index ba83a63..e037050 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.OutputCollector;  import org.apache.hadoop.mapred.RecordReader; -//import org.mortbay.log.Log;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java index 3b72a1d..cdb6a77 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitBase.java @@ -12,11 +12,7 @@ import java.io.Serializable;  import java.util.TreeSet;  /** - * Created with IntelliJ IDEA. - * User: chand_000 - * Date: 29/08/13 - * Time: 16:18 - * To change this template use File | Settings | File Templates. + * Split table logic   */  public abstract class HBaseTableSplitBase implements InputSplit,          Comparable<HBaseTableSplitBase>, Serializable { diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java index a266411..76edeba 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitGranular.java @@ -1,19 +1,15 @@  package parallelai.spyglass.hbase; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable;  import java.util.TreeSet;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.HConstants;  import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +/** + * Split table logic + */  public class HBaseTableSplitGranular extends HBaseTableSplitBase {  	private final Log LOG = LogFactory.getLog(HBaseTableSplitGranular.class); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java index ad5f78b..fa43226 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplitRegional.java @@ -3,17 +3,13 @@ package parallelai.spyglass.hbase;  import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList;  import java.util.Iterator;  import java.util.List;  import java.util.Vector; -import org.apache.commons.lang.SerializationUtils;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit;  public class HBaseTableSplitRegional extends HBaseTableSplitBase { | 
