From b9d987c0d9946f8f778fbec2856305c0f20fd3f8 Mon Sep 17 00:00:00 2001 From: Chandan Rajah Date: Wed, 7 Aug 2013 12:24:34 +0100 Subject: Updated CDH version to 4.3.0 and uncommented Raw Tap and Scheme --- .../parallelai/spyglass/hbase/HBaseRawTap.java | 622 ++++++++++----------- 1 file changed, 311 insertions(+), 311 deletions(-) (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 780d3fc..5dcd57d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.UUID; -// -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.hbase.HBaseConfiguration; -//import org.apache.hadoop.hbase.HColumnDescriptor; -//import org.apache.hadoop.hbase.HTableDescriptor; -//import org.apache.hadoop.hbase.MasterNotRunningException; -//import org.apache.hadoop.hbase.ZooKeeperConnectionException; -//import org.apache.hadoop.hbase.client.HBaseAdmin; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -//import org.apache.hadoop.mapred.FileInputFormat; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import cascading.flow.FlowProcess; -//import cascading.tap.SinkMode; -//import cascading.tap.Tap; -//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -//import cascading.tuple.TupleEntryCollector; -//import cascading.tuple.TupleEntryIterator; -// -//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -// -///** -//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with -//* the {@HBaseRawScheme} to allow for the reading and writing -//* of data to and from a HBase cluster. -//*/ -//@SuppressWarnings({ "deprecation", "rawtypes" }) -//public class HBaseRawTap extends Tap { -// /** -// * -// */ -// private static final long serialVersionUID = 8019189493428493323L; -// -// /** Field LOG */ -// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); -// -// private final String id = UUID.randomUUID().toString(); -// -// /** Field SCHEME */ -// public static final String SCHEME = "hbase"; -// -// /** Field hBaseAdmin */ -// private transient HBaseAdmin hBaseAdmin; -// -// /** Field hostName */ -// private String quorumNames; -// /** Field tableName */ -// private String tableName; -// private String base64Scan; -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// */ -// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { -// super(HBaseFullScheme, SinkMode.UPDATE); -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// * @param sinkMode -// * of type SinkMode -// */ -// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -// super(HBaseFullScheme, sinkMode); -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// */ -// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { -// super(HBaseFullScheme, SinkMode.UPDATE); -// this.quorumNames = quorumNames; -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// * @param sinkMode -// * of type SinkMode -// */ -// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -// super(HBaseFullScheme, sinkMode); -// this.quorumNames = quorumNames; -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param quorumNames HBase quorum -// * @param tableName The name of the HBase table to read -// * @param HBaseFullScheme -// * @param base64Scan An optional base64 encoded scan object -// * @param sinkMode If REPLACE the output table will be deleted before writing to -// */ -// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { -// super(HBaseFullScheme, sinkMode); -// this.quorumNames = quorumNames; -// this.tableName = tableName; -// this.base64Scan = base64Scan; -// } -// -// /** -// * Method getTableName returns the tableName of this HBaseTap object. -// * -// * @return the tableName (type String) of this HBaseTap object. -// */ -// public String getTableName() { -// return tableName; -// } -// -// public Path getPath() { -// return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); -// } -// -// protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { -// if (hBaseAdmin == null) { -// Configuration hbaseConf = HBaseConfiguration.create(conf); -// hBaseAdmin = new HBaseAdmin(hbaseConf); -// } -// -// return hBaseAdmin; -// } -// -// @Override -// public void sinkConfInit(FlowProcess process, JobConf conf) { -// if (quorumNames != null) { -// conf.set("hbase.zookeeper.quorum", quorumNames); -// } -// -// LOG.debug("sinking to table: {}", tableName); -// -// if (isReplace() && conf.get("mapred.task.partition") == null) { -// try { -// deleteResource(conf); -// -// } catch (IOException e) { -// throw new RuntimeException("could not delete resource: " + e); -// } -// } -// -// else if (isUpdate() || isReplace()) { -// try { -// createResource(conf); -// } catch (IOException e) { -// throw new RuntimeException(tableName + " does not exist !", e); -// } -// -// } -// -// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); -// super.sinkConfInit(process, conf); -// } -// -// @Override -// public String getIdentifier() { -// return id; -// } -// -// @Override -// public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) -// throws IOException { -// return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); -// } -// -// @Override -// public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) -// throws IOException { -// HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); -// hBaseCollector.prepare(); -// return hBaseCollector; -// } -// -// @Override -// public boolean createResource(JobConf jobConf) throws IOException { -// HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); -// -// if (hBaseAdmin.tableExists(tableName)) { -// return true; -// } -// -// LOG.info("creating hbase table: {}", tableName); -// -// HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); -// -// String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); -// -// for (String familyName : familyNames) { -// tableDescriptor.addFamily(new HColumnDescriptor(familyName)); -// } -// -// hBaseAdmin.createTable(tableDescriptor); -// -// return true; -// } -// -// @Override -// public boolean deleteResource(JobConf jobConf) throws IOException { -// if (getHBaseAdmin(jobConf).tableExists(tableName)) { -// if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) -// getHBaseAdmin(jobConf).disableTable(tableName); -// getHBaseAdmin(jobConf).deleteTable(tableName); -// } -// return true; -// } -// -// @Override -// public boolean resourceExists(JobConf jobConf) throws IOException { -// return getHBaseAdmin(jobConf).tableExists(tableName); -// } -// -// @Override -// public long getModifiedTime(JobConf jobConf) throws IOException { -// return System.currentTimeMillis(); // currently unable to find last mod -// // time -// // on a table -// } -// -// @Override -// public void sourceConfInit(FlowProcess process, JobConf conf) { -// // a hack for MultiInputFormat to see that there is a child format -// FileInputFormat.setInputPaths(conf, getPath()); -// -// if (quorumNames != null) { -// conf.set("hbase.zookeeper.quorum", quorumNames); -// } -// -// LOG.debug("sourcing from table: {}", tableName); -// conf.set(TableInputFormat.INPUT_TABLE, tableName); -// if (null != base64Scan) -// conf.set(TableInputFormat.SCAN, base64Scan); -// -// super.sourceConfInit(process, conf); -// } -// -// @Override -// public boolean equals(Object object) { -// if (this == object) { -// return true; -// } -// if (object == null || getClass() != object.getClass()) { -// return false; -// } -// if (!super.equals(object)) { -// return false; -// } -// -// HBaseRawTap hBaseTap = (HBaseRawTap) object; -// -// if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { -// return false; -// } -// -// if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { -// return false; -// } -// -// return true; -// } -// -// @Override -// public int hashCode() { -// int result = super.hashCode(); -// result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); -// return result; -// } -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** +* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +* the {@HBaseRawScheme} to allow for the reading and writing +* of data to and from a HBase cluster. +*/ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap { + /** + * + */ + private static final long serialVersionUID = 8019189493428493323L; + + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + + private final String id = UUID.randomUUID().toString(); + + /** Field SCHEME */ + public static final String SCHEME = "hbase"; + + /** Field hBaseAdmin */ + private transient HBaseAdmin hBaseAdmin; + + /** Field hostName */ + private String quorumNames; + /** Field tableName */ + private String tableName; + private String base64Scan; + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param quorumNames HBase quorum + * @param tableName The name of the HBase table to read + * @param HBaseFullScheme + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + this.base64Scan = base64Scan; + } + + /** + * Method getTableName returns the tableName of this HBaseTap object. + * + * @return the tableName (type String) of this HBaseTap object. + */ + public String getTableName() { + return tableName; + } + + public Path getPath() { + return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); + } + + protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { + if (hBaseAdmin == null) { + Configuration hbaseConf = HBaseConfiguration.create(conf); + hBaseAdmin = new HBaseAdmin(hbaseConf); + } + + return hBaseAdmin; + } + + @Override + public void sinkConfInit(FlowProcess process, JobConf conf) { + if (quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sinking to table: {}", tableName); + + if (isReplace() && conf.get("mapred.task.partition") == null) { + try { + deleteResource(conf); + + } catch (IOException e) { + throw new RuntimeException("could not delete resource: " + e); + } + } + + else if (isUpdate() || isReplace()) { + try { + createResource(conf); + } catch (IOException e) { + throw new RuntimeException(tableName + " does not exist !", e); + } + + } + + conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); + super.sinkConfInit(process, conf); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) + throws IOException { + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + } + + @Override + public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) + throws IOException { + HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); + hBaseCollector.prepare(); + return hBaseCollector; + } + + @Override + public boolean createResource(JobConf jobConf) throws IOException { + HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } + + LOG.info("creating hbase table: {}", tableName); + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + + String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + + for (String familyName : familyNames) { + tableDescriptor.addFamily(new HColumnDescriptor(familyName)); + } + + hBaseAdmin.createTable(tableDescriptor); + + return true; + } + + @Override + public boolean deleteResource(JobConf jobConf) throws IOException { + if (getHBaseAdmin(jobConf).tableExists(tableName)) { + if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) + getHBaseAdmin(jobConf).disableTable(tableName); + getHBaseAdmin(jobConf).deleteTable(tableName); + } + return true; + } + + @Override + public boolean resourceExists(JobConf jobConf) throws IOException { + return getHBaseAdmin(jobConf).tableExists(tableName); + } + + @Override + public long getModifiedTime(JobConf jobConf) throws IOException { + return System.currentTimeMillis(); // currently unable to find last mod + // time + // on a table + } + + @Override + public void sourceConfInit(FlowProcess process, JobConf conf) { + // a hack for MultiInputFormat to see that there is a child format + FileInputFormat.setInputPaths(conf, getPath()); + + if (quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sourcing from table: {}", tableName); + conf.set(TableInputFormat.INPUT_TABLE, tableName); + if (null != base64Scan) + conf.set(TableInputFormat.SCAN, base64Scan); + + super.sourceConfInit(process, conf); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + + HBaseRawTap hBaseTap = (HBaseRawTap) object; + + if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { + return false; + } + + if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); + return result; + } +} -- cgit v1.2.3