///* //* Copyright (c) 2009 Concurrent, Inc. //* //* This work has been released into the public domain //* by the copyright holder. This applies worldwide. //* //* In case this is not legally possible: //* The copyright holder grants any entity the right //* to use this work for any purpose, without any //* conditions, unless such conditions are required by law. //*/ // //package parallelai.spyglass.hbase; // //import java.io.IOException; //import java.util.UUID; // //import org.apache.hadoop.conf.Configuration; //import org.apache.hadoop.fs.Path; //import org.apache.hadoop.hbase.HBaseConfiguration; //import org.apache.hadoop.hbase.HColumnDescriptor; //import org.apache.hadoop.hbase.HTableDescriptor; //import org.apache.hadoop.hbase.MasterNotRunningException; //import org.apache.hadoop.hbase.ZooKeeperConnectionException; //import org.apache.hadoop.hbase.client.HBaseAdmin; //import org.apache.hadoop.hbase.client.Scan; //import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; //import org.apache.hadoop.mapred.FileInputFormat; //import org.apache.hadoop.mapred.JobConf; //import org.apache.hadoop.mapred.OutputCollector; //import org.apache.hadoop.mapred.RecordReader; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; // //import cascading.flow.FlowProcess; //import cascading.tap.SinkMode; //import cascading.tap.Tap; //import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; //import cascading.tuple.TupleEntryCollector; //import cascading.tuple.TupleEntryIterator; // //import org.apache.hadoop.hbase.mapreduce.TableInputFormat; // ///** //* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with //* the {@HBaseRawScheme} to allow for the reading and writing //* of data to and from a HBase cluster. //*/ //@SuppressWarnings({ "deprecation", "rawtypes" }) //public class HBaseRawTap extends Tap { // /** // * // */ // private static final long serialVersionUID = 8019189493428493323L; // // /** Field LOG */ // private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); // // private final String id = UUID.randomUUID().toString(); // // /** Field SCHEME */ // public static final String SCHEME = "hbase"; // // /** Field hBaseAdmin */ // private transient HBaseAdmin hBaseAdmin; // // /** Field hostName */ // private String quorumNames; // /** Field tableName */ // private String tableName; // private String base64Scan; // // /** // * Constructor HBaseTap creates a new HBaseTap instance. // * // * @param tableName // * of type String // * @param HBaseFullScheme // * of type HBaseFullScheme // */ // public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { // super(HBaseFullScheme, SinkMode.UPDATE); // this.tableName = tableName; // } // // /** // * Constructor HBaseTap creates a new HBaseTap instance. // * // * @param tableName // * of type String // * @param HBaseFullScheme // * of type HBaseFullScheme // * @param sinkMode // * of type SinkMode // */ // public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { // super(HBaseFullScheme, sinkMode); // this.tableName = tableName; // } // // /** // * Constructor HBaseTap creates a new HBaseTap instance. // * // * @param tableName // * of type String // * @param HBaseFullScheme // * of type HBaseFullScheme // */ // public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { // super(HBaseFullScheme, SinkMode.UPDATE); // this.quorumNames = quorumNames; // this.tableName = tableName; // } // // /** // * Constructor HBaseTap creates a new HBaseTap instance. // * // * @param tableName // * of type String // * @param HBaseFullScheme // * of type HBaseFullScheme // * @param sinkMode // * of type SinkMode // */ // public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { // super(HBaseFullScheme, sinkMode); // this.quorumNames = quorumNames; // this.tableName = tableName; // } // // /** // * Constructor HBaseTap creates a new HBaseTap instance. // * // * @param quorumNames HBase quorum // * @param tableName The name of the HBase table to read // * @param HBaseFullScheme // * @param base64Scan An optional base64 encoded scan object // * @param sinkMode If REPLACE the output table will be deleted before writing to // */ // public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { // super(HBaseFullScheme, sinkMode); // this.quorumNames = quorumNames; // this.tableName = tableName; // this.base64Scan = base64Scan; // } // // /** // * Method getTableName returns the tableName of this HBaseTap object. // * // * @return the tableName (type String) of this HBaseTap object. // */ // public String getTableName() { // return tableName; // } // // public Path getPath() { // return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); // } // // protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { // if (hBaseAdmin == null) { // Configuration hbaseConf = HBaseConfiguration.create(conf); // hBaseAdmin = new HBaseAdmin(hbaseConf); // } // // return hBaseAdmin; // } // // @Override // public void sinkConfInit(FlowProcess process, JobConf conf) { // if (quorumNames != null) { // conf.set("hbase.zookeeper.quorum", quorumNames); // } // // LOG.debug("sinking to table: {}", tableName); // // if (isReplace() && conf.get("mapred.task.partition") == null) { // try { // deleteResource(conf); // // } catch (IOException e) { // throw new RuntimeException("could not delete resource: " + e); // } // } // // else if (isUpdate() || isReplace()) { // try { // createResource(conf); // } catch (IOException e) { // throw new RuntimeException(tableName + " does not exist !", e); // } // // } // // conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); // super.sinkConfInit(process, conf); // } // // @Override // public String getIdentifier() { // return id; // } // // @Override // public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) // throws IOException { // return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); // } // // @Override // public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) // throws IOException { // HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); // hBaseCollector.prepare(); // return hBaseCollector; // } // // @Override // public boolean createResource(JobConf jobConf) throws IOException { // HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); // // if (hBaseAdmin.tableExists(tableName)) { // return true; // } // // LOG.info("creating hbase table: {}", tableName); // // HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); // // String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); // // for (String familyName : familyNames) { // tableDescriptor.addFamily(new HColumnDescriptor(familyName)); // } // // hBaseAdmin.createTable(tableDescriptor); // // return true; // } // // @Override // public boolean deleteResource(JobConf jobConf) throws IOException { // if (getHBaseAdmin(jobConf).tableExists(tableName)) { // if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) // getHBaseAdmin(jobConf).disableTable(tableName); // getHBaseAdmin(jobConf).deleteTable(tableName); // } // return true; // } // // @Override // public boolean resourceExists(JobConf jobConf) throws IOException { // return getHBaseAdmin(jobConf).tableExists(tableName); // } // // @Override // public long getModifiedTime(JobConf jobConf) throws IOException { // return System.currentTimeMillis(); // currently unable to find last mod // // time // // on a table // } // // @Override // public void sourceConfInit(FlowProcess process, JobConf conf) { // // a hack for MultiInputFormat to see that there is a child format // FileInputFormat.setInputPaths(conf, getPath()); // // if (quorumNames != null) { // conf.set("hbase.zookeeper.quorum", quorumNames); // } // // LOG.debug("sourcing from table: {}", tableName); // conf.set(TableInputFormat.INPUT_TABLE, tableName); // if (null != base64Scan) // conf.set(TableInputFormat.SCAN, base64Scan); // // super.sourceConfInit(process, conf); // } // // @Override // public boolean equals(Object object) { // if (this == object) { // return true; // } // if (object == null || getClass() != object.getClass()) { // return false; // } // if (!super.equals(object)) { // return false; // } // // HBaseRawTap hBaseTap = (HBaseRawTap) object; // // if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { // return false; // } // // if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { // return false; // } // // return true; // } // // @Override // public int hashCode() { // int result = super.hashCode(); // result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); // return result; // } //}