diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java | 622 |
1 files changed, 311 insertions, 311 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 0421b6e..780d3fc 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -/* - * Copyright (c) 2009 Concurrent, Inc. - * - * This work has been released into the public domain - * by the copyright holder. This applies worldwide. - * - * In case this is not legally possible: - * The copyright holder grants any entity the right - * to use this work for any purpose, without any - * conditions, unless such conditions are required by law. - */ - -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.util.UUID; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import cascading.flow.FlowProcess; -import cascading.tap.SinkMode; -import cascading.tap.Tap; -import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -import cascading.tuple.TupleEntryCollector; -import cascading.tuple.TupleEntryIterator; - -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; - -/** - * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with - * the {@HBaseRawScheme} to allow for the reading and writing - * of data to and from a HBase cluster. - */ -@SuppressWarnings({ "deprecation", "rawtypes" }) -public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { - /** - * - */ - private static final long serialVersionUID = 8019189493428493323L; - - /** Field LOG */ - private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); - - private final String id = UUID.randomUUID().toString(); - - /** Field SCHEME */ - public static final String SCHEME = "hbase"; - - /** Field hBaseAdmin */ - private transient HBaseAdmin hBaseAdmin; - - /** Field hostName */ - private String quorumNames; - /** Field tableName */ - private String tableName; - private String base64Scan; - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - */ - public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { - super(HBaseFullScheme, SinkMode.UPDATE); - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - * @param sinkMode - * of type SinkMode - */ - public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { - super(HBaseFullScheme, sinkMode); - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - */ - public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { - super(HBaseFullScheme, SinkMode.UPDATE); - this.quorumNames = quorumNames; - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - * @param sinkMode - * of type SinkMode - */ - public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { - super(HBaseFullScheme, sinkMode); - this.quorumNames = quorumNames; - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param quorumNames HBase quorum - * @param tableName The name of the HBase table to read - * @param HBaseFullScheme - * @param base64Scan An optional base64 encoded scan object - * @param sinkMode If REPLACE the output table will be deleted before writing to - */ - public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { - super(HBaseFullScheme, sinkMode); - this.quorumNames = quorumNames; - this.tableName = tableName; - this.base64Scan = base64Scan; - } - - /** - * Method getTableName returns the tableName of this HBaseTap object. - * - * @return the tableName (type String) of this HBaseTap object. - */ - public String getTableName() { - return tableName; - } - - public Path getPath() { - return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); - } - - protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { - if (hBaseAdmin == null) { - Configuration hbaseConf = HBaseConfiguration.create(conf); - hBaseAdmin = new HBaseAdmin(hbaseConf); - } - - return hBaseAdmin; - } - - @Override - public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { - if (quorumNames != null) { - conf.set("hbase.zookeeper.quorum", quorumNames); - } - - LOG.debug("sinking to table: {}", tableName); - - if (isReplace() && conf.get("mapred.task.partition") == null) { - try { - deleteResource(conf); - - } catch (IOException e) { - throw new RuntimeException("could not delete resource: " + e); - } - } - - else if (isUpdate() || isReplace()) { - try { - createResource(conf); - } catch (IOException e) { - throw new RuntimeException(tableName + " does not exist !", e); - } - - } - - conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); - super.sinkConfInit(process, conf); - } - - @Override - public String getIdentifier() { - return id; - } - - @Override - public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) - throws IOException { - return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); - } - - @Override - public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) - throws IOException { - HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); - hBaseCollector.prepare(); - return hBaseCollector; - } - - @Override - public boolean createResource(JobConf jobConf) throws IOException { - HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); - - if (hBaseAdmin.tableExists(tableName)) { - return true; - } - - LOG.info("creating hbase table: {}", tableName); - - HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - - String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); - - for (String familyName : familyNames) { - tableDescriptor.addFamily(new HColumnDescriptor(familyName)); - } - - hBaseAdmin.createTable(tableDescriptor); - - return true; - } - - @Override - public boolean deleteResource(JobConf jobConf) throws IOException { - if (getHBaseAdmin(jobConf).tableExists(tableName)) { - if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) - getHBaseAdmin(jobConf).disableTable(tableName); - getHBaseAdmin(jobConf).deleteTable(tableName); - } - return true; - } - - @Override - public boolean resourceExists(JobConf jobConf) throws IOException { - return getHBaseAdmin(jobConf).tableExists(tableName); - } - - @Override - public long getModifiedTime(JobConf jobConf) throws IOException { - return System.currentTimeMillis(); // currently unable to find last mod - // time - // on a table - } - - @Override - public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { - // a hack for MultiInputFormat to see that there is a child format - FileInputFormat.setInputPaths(conf, getPath()); - - if (quorumNames != null) { - conf.set("hbase.zookeeper.quorum", quorumNames); - } - - LOG.debug("sourcing from table: {}", tableName); - conf.set(TableInputFormat.INPUT_TABLE, tableName); - if (null != base64Scan) - conf.set(TableInputFormat.SCAN, base64Scan); - - super.sourceConfInit(process, conf); - } - - @Override - public boolean equals(Object object) { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - if (!super.equals(object)) { - return false; - } - - HBaseRawTap hBaseTap = (HBaseRawTap) object; - - if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { - return false; - } - - if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); - return result; - } -} +///* +//* Copyright (c) 2009 Concurrent, Inc. +//* +//* This work has been released into the public domain +//* by the copyright holder. This applies worldwide. +//* +//* In case this is not legally possible: +//* The copyright holder grants any entity the right +//* to use this work for any purpose, without any +//* conditions, unless such conditions are required by law. +//*/ +// +//package parallelai.spyglass.hbase; +// +//import java.io.IOException; +//import java.util.UUID; +// +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.fs.Path; +//import org.apache.hadoop.hbase.HBaseConfiguration; +//import org.apache.hadoop.hbase.HColumnDescriptor; +//import org.apache.hadoop.hbase.HTableDescriptor; +//import org.apache.hadoop.hbase.MasterNotRunningException; +//import org.apache.hadoop.hbase.ZooKeeperConnectionException; +//import org.apache.hadoop.hbase.client.HBaseAdmin; +//import org.apache.hadoop.hbase.client.Scan; +//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +//import org.apache.hadoop.mapred.FileInputFormat; +//import org.apache.hadoop.mapred.JobConf; +//import org.apache.hadoop.mapred.OutputCollector; +//import org.apache.hadoop.mapred.RecordReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import cascading.flow.FlowProcess; +//import cascading.tap.SinkMode; +//import cascading.tap.Tap; +//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +//import cascading.tuple.TupleEntryCollector; +//import cascading.tuple.TupleEntryIterator; +// +//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +// +///** +//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +//* the {@HBaseRawScheme} to allow for the reading and writing +//* of data to and from a HBase cluster. +//*/ +//@SuppressWarnings({ "deprecation", "rawtypes" }) +//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { +// /** +// * +// */ +// private static final long serialVersionUID = 8019189493428493323L; +// +// /** Field LOG */ +// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); +// +// private final String id = UUID.randomUUID().toString(); +// +// /** Field SCHEME */ +// public static final String SCHEME = "hbase"; +// +// /** Field hBaseAdmin */ +// private transient HBaseAdmin hBaseAdmin; +// +// /** Field hostName */ +// private String quorumNames; +// /** Field tableName */ +// private String tableName; +// private String base64Scan; +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// */ +// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +// super(HBaseFullScheme, SinkMode.UPDATE); +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// * @param sinkMode +// * of type SinkMode +// */ +// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +// super(HBaseFullScheme, sinkMode); +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// */ +// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +// super(HBaseFullScheme, SinkMode.UPDATE); +// this.quorumNames = quorumNames; +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// * @param sinkMode +// * of type SinkMode +// */ +// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +// super(HBaseFullScheme, sinkMode); +// this.quorumNames = quorumNames; +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param quorumNames HBase quorum +// * @param tableName The name of the HBase table to read +// * @param HBaseFullScheme +// * @param base64Scan An optional base64 encoded scan object +// * @param sinkMode If REPLACE the output table will be deleted before writing to +// */ +// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +// super(HBaseFullScheme, sinkMode); +// this.quorumNames = quorumNames; +// this.tableName = tableName; +// this.base64Scan = base64Scan; +// } +// +// /** +// * Method getTableName returns the tableName of this HBaseTap object. +// * +// * @return the tableName (type String) of this HBaseTap object. +// */ +// public String getTableName() { +// return tableName; +// } +// +// public Path getPath() { +// return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +// } +// +// protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +// if (hBaseAdmin == null) { +// Configuration hbaseConf = HBaseConfiguration.create(conf); +// hBaseAdmin = new HBaseAdmin(hbaseConf); +// } +// +// return hBaseAdmin; +// } +// +// @Override +// public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +// if (quorumNames != null) { +// conf.set("hbase.zookeeper.quorum", quorumNames); +// } +// +// LOG.debug("sinking to table: {}", tableName); +// +// if (isReplace() && conf.get("mapred.task.partition") == null) { +// try { +// deleteResource(conf); +// +// } catch (IOException e) { +// throw new RuntimeException("could not delete resource: " + e); +// } +// } +// +// else if (isUpdate() || isReplace()) { +// try { +// createResource(conf); +// } catch (IOException e) { +// throw new RuntimeException(tableName + " does not exist !", e); +// } +// +// } +// +// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +// super.sinkConfInit(process, conf); +// } +// +// @Override +// public String getIdentifier() { +// return id; +// } +// +// @Override +// public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) +// throws IOException { +// return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +// } +// +// @Override +// public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) +// throws IOException { +// HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +// hBaseCollector.prepare(); +// return hBaseCollector; +// } +// +// @Override +// public boolean createResource(JobConf jobConf) throws IOException { +// HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); +// +// if (hBaseAdmin.tableExists(tableName)) { +// return true; +// } +// +// LOG.info("creating hbase table: {}", tableName); +// +// HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); +// +// String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); +// +// for (String familyName : familyNames) { +// tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +// } +// +// hBaseAdmin.createTable(tableDescriptor); +// +// return true; +// } +// +// @Override +// public boolean deleteResource(JobConf jobConf) throws IOException { +// if (getHBaseAdmin(jobConf).tableExists(tableName)) { +// if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +// getHBaseAdmin(jobConf).disableTable(tableName); +// getHBaseAdmin(jobConf).deleteTable(tableName); +// } +// return true; +// } +// +// @Override +// public boolean resourceExists(JobConf jobConf) throws IOException { +// return getHBaseAdmin(jobConf).tableExists(tableName); +// } +// +// @Override +// public long getModifiedTime(JobConf jobConf) throws IOException { +// return System.currentTimeMillis(); // currently unable to find last mod +// // time +// // on a table +// } +// +// @Override +// public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +// // a hack for MultiInputFormat to see that there is a child format +// FileInputFormat.setInputPaths(conf, getPath()); +// +// if (quorumNames != null) { +// conf.set("hbase.zookeeper.quorum", quorumNames); +// } +// +// LOG.debug("sourcing from table: {}", tableName); +// conf.set(TableInputFormat.INPUT_TABLE, tableName); +// if (null != base64Scan) +// conf.set(TableInputFormat.SCAN, base64Scan); +// +// super.sourceConfInit(process, conf); +// } +// +// @Override +// public boolean equals(Object object) { +// if (this == object) { +// return true; +// } +// if (object == null || getClass() != object.getClass()) { +// return false; +// } +// if (!super.equals(object)) { +// return false; +// } +// +// HBaseRawTap hBaseTap = (HBaseRawTap) object; +// +// if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +// return false; +// } +// +// if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +// return false; +// } +// +// return true; +// } +// +// @Override +// public int hashCode() { +// int result = super.hashCode(); +// result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +// return result; +// } +//} |