From b72c234dd35c3eb807e8050385adf697dcf97fad Mon Sep 17 00:00:00 2001 From: rotem Date: Sun, 16 Jun 2013 12:09:42 +0300 Subject: Adding HBase raw taps and source, bumping needed dependency versions, bumping scala version --- pom.xml | 49 ++-- .../parallelai/spyglass/hbase/HBaseRawScheme.java | 286 +++++++++++++++++++ .../parallelai/spyglass/hbase/HBaseRawTap.java | 311 +++++++++++++++++++++ .../parallelai/spyglass/hbase/HBaseRawSource.scala | 62 ++++ 4 files changed, 689 insertions(+), 19 deletions(-) create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java create mode 100644 src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java create mode 100644 src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala diff --git a/pom.xml b/pom.xml index d254977..077c22c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ Cascading and Scalding wrapper for HBase with advanced features parallelai parallelai.spyglass - 2.0.3 + 2.0.4-SNAPSHOT jar @@ -29,25 +29,26 @@ UTF-8 - 0.0.4-cdh4.2.0 - 1.3.0-cdh4.2.0 - 2.0.0-cdh4.2.0 - 0.94.2-cdh4.2.0 - 0.10.0-cdh4.2.0 - 0.7-cdh4.2.0 - 2.0.0-mr1-cdh4.2.0 - 3.3.0-cdh4.2.0 - 2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0 - 3.3.0-cdh4.2.0 - 0.10.0-cdh4.2.0 - 1.4.2-cdh4.2.0 - 0.8.0-cdh4.2.0 - 3.4.5-cdh4.2.0 + 0.0.4-cdh4.3.0 + 1.3.0-cdh4.3.0 + 2.0.0-cdh4.3.0 + 0.94.6-cdh4.3.0 + 0.10.0-cdh4.3.0 + 0.7-cdh4.3.0 + 2.0.0-mr1-cdh4.3.0 + 3.3.2-cdh4.3.0 + 2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0 + 3.3.2-cdh4.3.0 + 0.11.0-cdh4.3.0 + 1.4.3-cdh4.3.0 + 0.8.2-cdh4.3.0 + 3.4.5-cdh4.3.0 - 2.9.2 - 0.8.3 - 2.1.0 + 2.10.2 + 2.10 + 0.8.5 + 2.1.6 0.1.1 1.7.1 @@ -162,7 +163,7 @@ com.twitter - scalding_${scala.version} + scalding-core_${scalding.scala.version} ${scalding.version} @@ -189,6 +190,16 @@ ${typesafe.config.version} + + com.twitter.elephantbird + elephant-bird-core + 4.1 + + + com.twitter.elephantbird + elephant-bird-hadoop-compat + 4.1 + diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java new file mode 100644 index 0000000..32730d6 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; + +/** + * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction + * with the {@HBaseRawTap} to allow for the reading and writing of data + * to and from a HBase cluster. + * + * @see HBaseRawTap + */ +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class HBaseRawScheme extends Scheme { + /** + * + */ + private static final long serialVersionUID = 6248976486883281356L; + + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); + + public final Fields RowKeyField = new Fields("rowkey"); + public final Fields RowField = new Fields("row"); + + /** String familyNames */ + private String[] familyNames; + + private boolean writeNulls = true; + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance. + * + * @param keyFields + * of type Fields + * @param familyName + * of type String + * @param valueFields + * of type Fields + */ + public HBaseRawScheme(String familyName) { + this(new String[] { familyName }); + } + + public HBaseRawScheme(String[] familyNames) { + this.familyNames = familyNames; + setSourceFields(); + } + + public HBaseRawScheme(String familyName, boolean writeNulls) { + this(new String[] { familyName }, writeNulls); + } + + public HBaseRawScheme(String[] familyNames, boolean writeNulls) { + this.familyNames = familyNames; + this.writeNulls = writeNulls; + setSourceFields(); + } + + private void setSourceFields() { + Fields sourceFields = Fields.join(RowKeyField, RowField); + setSourceFields(sourceFields); + } + + /** + * Method getFamilyNames returns the set of familyNames of this HBaseScheme + * object. + * + * @return the familyNames (type String[]) of this HBaseScheme object. + */ + public String[] getFamilyNames() { + HashSet familyNameSet = new HashSet(); + if (familyNames != null) { + for (String familyName : familyNames) { + familyNameSet.add(familyName); + } + } + return familyNameSet.toArray(new String[0]); + } + + @Override + public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { + Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; + + sourceCall.setContext(pair); + } + + @Override + public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { + sourceCall.setContext(null); + } + + @SuppressWarnings("unchecked") + @Override + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) + throws IOException { + Tuple result = new Tuple(); + + Object key = sourceCall.getContext()[0]; + Object value = sourceCall.getContext()[1]; + boolean hasNext = sourceCall.getInput().next(key, value); + if (!hasNext) { + return false; + } + + // Skip nulls + if (key == null || value == null) { + return true; + } + + ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; + Result row = (Result) value; + result.add(keyWritable); + result.add(row); + sourceCall.getIncomingEntry().setTuple(result); + return true; + } + + @SuppressWarnings("unchecked") + @Override + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { + TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); + OutputCollector outputCollector = sinkCall.getOutput(); + Tuple key = tupleEntry.selectTuple(RowKeyField); + Object okey = key.getObject(0); + ImmutableBytesWritable keyBytes = getBytes(okey); + Put put = new Put(keyBytes.get()); + Fields outFields = tupleEntry.getFields().subtract(RowKeyField); + if (null != outFields) { + TupleEntry values = tupleEntry.selectEntry(outFields); + for (int n = 0; n < values.getFields().size(); n++) { + Object o = values.get(n); + ImmutableBytesWritable valueBytes = getBytes(o); + Comparable field = outFields.get(n); + ColumnName cn = parseColumn((String) field); + if (null == cn.family) { + if (n >= familyNames.length) + cn.family = familyNames[familyNames.length - 1]; + else + cn.family = familyNames[n]; + } + if (null != o || writeNulls) + put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); + } + } + outputCollector.collect(null, put); + } + + private ImmutableBytesWritable getBytes(Object obj) { + if (null == obj) + return new ImmutableBytesWritable(new byte[0]); + if (obj instanceof ImmutableBytesWritable) + return (ImmutableBytesWritable) obj; + else if (obj instanceof String) + return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); + else if (obj instanceof Long) + return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); + else if (obj instanceof Integer) + return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); + else if (obj instanceof Short) + return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); + else if (obj instanceof Boolean) + return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); + else if (obj instanceof Double) + return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); + else + throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" + + obj.getClass().getName()); + } + + private ColumnName parseColumn(String column) { + ColumnName ret = new ColumnName(); + int pos = column.indexOf(":"); + if (pos > 0) { + ret.name = column.substring(pos + 1); + ret.family = column.substring(0, pos); + } else { + ret.name = column; + } + return ret; + } + + private class ColumnName { + String family; + String name; + + ColumnName() { + } + } + + @Override + public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { + conf.setOutputFormat(TableOutputFormat.class); + conf.setOutputKeyClass(ImmutableBytesWritable.class); + conf.setOutputValueClass(Put.class); + } + + @Override + public void sourceConfInit(FlowProcess process, Tap tap, + JobConf conf) { + DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, + ValueCopier.class); + if (null != familyNames) { + String columns = Util.join(this.familyNames, " "); + LOG.debug("sourcing from column families: {}", columns); + conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); + } + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + + HBaseRawScheme that = (HBaseRawScheme) object; + + if (!Arrays.equals(familyNames, that.familyNames)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); + return result; + } + + public static class ValueCopier implements DeprecatedInputFormatValueCopier { + + public ValueCopier() { + } + + public void copyValue(Result oldValue, Result newValue) { + if (null != oldValue && null != newValue) { + oldValue.copyFrom(newValue); + } + } + + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java new file mode 100644 index 0000000..738fd51 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -0,0 +1,311 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** + * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with + * the {@HBaseRawScheme} to allow for the reading and writing + * of data to and from a HBase cluster. + */ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap { + /** + * + */ + private static final long serialVersionUID = 8019189493428493323L; + + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + + private final String id = UUID.randomUUID().toString(); + + /** Field SCHEME */ + public static final String SCHEME = "hbase"; + + /** Field hBaseAdmin */ + private transient HBaseAdmin hBaseAdmin; + + /** Field hostName */ + private String quorumNames; + /** Field tableName */ + private String tableName; + private String base64Scan; + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param quorumNames + * @param tableName + * @param HBaseFullScheme + * @param base64Scan + * @param sinkMode + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + this.base64Scan = base64Scan; + } + + /** + * Method getTableName returns the tableName of this HBaseTap object. + * + * @return the tableName (type String) of this HBaseTap object. + */ + public String getTableName() { + return tableName; + } + + public Path getPath() { + return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); + } + + protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { + if (hBaseAdmin == null) { + Configuration hbaseConf = HBaseConfiguration.create(conf); + hBaseAdmin = new HBaseAdmin(hbaseConf); + } + + return hBaseAdmin; + } + + @Override + public void sinkConfInit(FlowProcess process, JobConf conf) { + if (quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sinking to table: {}", tableName); + + if (isReplace() && conf.get("mapred.task.partition") == null) { + try { + deleteResource(conf); + + } catch (IOException e) { + throw new RuntimeException("could not delete resource: " + e); + } + } + + else if (isUpdate() || isReplace()) { + try { + createResource(conf); + } catch (IOException e) { + throw new RuntimeException(tableName + " does not exist !", e); + } + + } + + conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); + super.sinkConfInit(process, conf); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) + throws IOException { + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + } + + @Override + public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) + throws IOException { + HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); + hBaseCollector.prepare(); + return hBaseCollector; + } + + @Override + public boolean createResource(JobConf jobConf) throws IOException { + HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } + + LOG.info("creating hbase table: {}", tableName); + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + + String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + + for (String familyName : familyNames) { + tableDescriptor.addFamily(new HColumnDescriptor(familyName)); + } + + hBaseAdmin.createTable(tableDescriptor); + + return true; + } + + @Override + public boolean deleteResource(JobConf jobConf) throws IOException { + if (getHBaseAdmin(jobConf).tableExists(tableName)) { + if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) + getHBaseAdmin(jobConf).disableTable(tableName); + getHBaseAdmin(jobConf).deleteTable(tableName); + } + return true; + } + + @Override + public boolean resourceExists(JobConf jobConf) throws IOException { + return getHBaseAdmin(jobConf).tableExists(tableName); + } + + @Override + public long getModifiedTime(JobConf jobConf) throws IOException { + return System.currentTimeMillis(); // currently unable to find last mod + // time + // on a table + } + + @Override + public void sourceConfInit(FlowProcess process, JobConf conf) { + // a hack for MultiInputFormat to see that there is a child format + FileInputFormat.setInputPaths(conf, getPath()); + + if (quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sourcing from table: {}", tableName); + conf.set(TableInputFormat.INPUT_TABLE, tableName); + if (null != base64Scan) + conf.set(TableInputFormat.SCAN, base64Scan); + + super.sourceConfInit(process, conf); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + + HBaseRawTap hBaseTap = (HBaseRawTap) object; + + if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { + return false; + } + + if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); + return result; + } +} diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala new file mode 100644 index 0000000..1fc6f7d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -0,0 +1,62 @@ +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } +} +class HBaseRawSource( + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan:String = null, + sinkMode: SinkMode = null) extends Source { + + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_,_,_]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_,_,_]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} -- cgit v1.2.3