/* * Copyright (c) 2009 Concurrent, Inc. * * This work has been released into the public domain * by the copyright holder. This applies worldwide. * * In case this is not legally possible: * The copyright holder grants any entity the right * to use this work for any purpose, without any * conditions, unless such conditions are required by law. */ package parallelai.spyglass.hbase; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; import cascading.flow.FlowProcess; import cascading.scheme.Scheme; import cascading.scheme.SinkCall; import cascading.scheme.SourceCall; import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.util.Util; /** * It provides the wiring between Fields and Columns and Column families * In effect to write to cf:column * * data:name data:surname address: street * name1 surname1 address1 * * We will initialize the HBaseSource with * ("data","data","data") * ("name","surname","address") * Data: * ("name1","surname1","address1") * ... * * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction * with the {@HBaseRawTap} to allow for the reading and writing of data * to and from a HBase cluster. * * @see HBaseRawTap */ @SuppressWarnings({ "rawtypes", "deprecation" }) public class HBaseRawScheme extends Scheme { /** * */ private static final long serialVersionUID = 6248976486883281356L; /** Field LOG */ private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); public final Fields RowKeyField = new Fields("rowkey"); public final Fields RowField = new Fields("row"); /** String familyNames */ private String[] familyNames; private boolean writeNulls = true; /** * Constructor HBaseScheme creates a new HBaseScheme instance. * @param familyName * of type String */ public HBaseRawScheme(String familyName) { this(new String[] { familyName }); } public HBaseRawScheme(String[] familyNames) { this.familyNames = familyNames; setSourceFields(); } public HBaseRawScheme(String familyName, boolean writeNulls) { this(new String[] { familyName }, writeNulls); } public HBaseRawScheme(String[] familyNames, boolean writeNulls) { this.familyNames = familyNames; this.writeNulls = writeNulls; setSourceFields(); } private void setSourceFields() { Fields sourceFields = Fields.join(RowKeyField, RowField); setSourceFields(sourceFields); } /** * Method getFamilyNames returns the set of familyNames of this HBaseScheme * object. * * @return the familyNames (type String[]) of this HBaseScheme object. */ public String[] getFamilyNames() { HashSet familyNameSet = new HashSet(); if (familyNames != null) { for (String familyName : familyNames) { familyNameSet.add(familyName); } } return familyNameSet.toArray(new String[0]); } @Override public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; sourceCall.setContext(pair); } @Override public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { sourceCall.setContext(null); } @SuppressWarnings("unchecked") @Override public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { Tuple result = new Tuple(); Object key = sourceCall.getContext()[0]; Object value = sourceCall.getContext()[1]; boolean hasNext = sourceCall.getInput().next(key, value); if (!hasNext) { return false; } // Skip nulls if (key == null || value == null) { return true; } ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; Result row = (Result) value; result.add(keyWritable); result.add(row); sourceCall.getIncomingEntry().setTuple(result); return true; } @SuppressWarnings("unchecked") @Override public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); OutputCollector outputCollector = sinkCall.getOutput(); Tuple key = tupleEntry.selectTuple(RowKeyField); Object okey = key.getObject(0); ImmutableBytesWritable keyBytes = getBytes(okey); Put put = new Put(keyBytes.get()); Fields outFields = tupleEntry.getFields().subtract(RowKeyField); if (null != outFields) { TupleEntry values = tupleEntry.selectEntry(outFields); for (int n = 0; n < values.getFields().size(); n++) { Object o = values.get(n); ImmutableBytesWritable valueBytes = getBytes(o); Comparable field = outFields.get(n); ColumnName cn = parseColumn((String) field); if (null == cn.family) { if (n >= familyNames.length) cn.family = familyNames[familyNames.length - 1]; else cn.family = familyNames[n]; } if (null != o || writeNulls) put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); } } outputCollector.collect(null, put); } private ImmutableBytesWritable getBytes(Object obj) { if (null == obj) return new ImmutableBytesWritable(new byte[0]); if (obj instanceof ImmutableBytesWritable) return (ImmutableBytesWritable) obj; else if (obj instanceof String) return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); else if (obj instanceof Long) return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); else if (obj instanceof Integer) return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); else if (obj instanceof Short) return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); else if (obj instanceof Boolean) return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); else if (obj instanceof Double) return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); else throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" + obj.getClass().getName()); } private ColumnName parseColumn(String column) { ColumnName ret = new ColumnName(); int pos = column.indexOf(":"); if (pos > 0) { ret.name = column.substring(pos + 1); ret.family = column.substring(0, pos); } else { ret.name = column; } return ret; } private class ColumnName { String family; String name; ColumnName() { } } @Override public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setOutputFormat(TableOutputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); conf.setOutputValueClass(Put.class); } @Override public void sourceConfInit(FlowProcess process, Tap tap, JobConf conf) { DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, ValueCopier.class); if (null != familyNames) { String columns = Util.join(this.familyNames, " "); LOG.debug("sourcing from column families: {}", columns); conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); } } @Override public boolean equals(Object object) { if (this == object) { return true; } if (object == null || getClass() != object.getClass()) { return false; } if (!super.equals(object)) { return false; } HBaseRawScheme that = (HBaseRawScheme) object; if (!Arrays.equals(familyNames, that.familyNames)) { return false; } return true; } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); return result; } public static class ValueCopier implements DeprecatedInputFormatValueCopier { public ValueCopier() { } public void copyValue(Result oldValue, Result newValue) { if (null != oldValue && null != newValue) { oldValue.copyFrom(newValue); } } } }