aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseScheme.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java336
1 files changed, 336 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
new file mode 100644
index 0000000..e5acc30
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.hbase;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import cascading.util.Util;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+//import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+/**
+ * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to
+ * allow for the reading and writing of data to and from a HBase cluster.
+ *
+ * @see HBaseTap
+ */
+public class HBaseScheme
+// extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+ extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseScheme.class);
+
+ /** Field keyFields */
+ private Fields keyField;
+
+ /** Long timestamp */
+ private long timeStamp;
+
+ /** String familyNames */
+ private String[] familyNames;
+ /** Field valueFields */
+ private Fields[] valueFields;
+
+ /** String columns */
+ private transient String[] columns;
+ /** Field fields */
+ private transient byte[][] fields;
+
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields of type Fields
+ * @param familyName of type String
+ * @param valueFields of type Fields
+ */
+ public HBaseScheme(Fields keyFields, String familyName, Fields valueFields) {
+ this(keyFields, new String[]{familyName}, Fields.fields(valueFields));
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields of type Fields
+ * @param familyNames of type String[]
+ * @param valueFields of type Fields[]
+ */
+ public HBaseScheme(Fields keyFields, String[] familyNames, Fields[] valueFields) {
+ this.keyField = keyFields;
+ this.familyNames = familyNames;
+ this.valueFields = valueFields;
+ this.timeStamp = System.currentTimeMillis();
+
+ setSourceSink(this.keyField, this.valueFields);
+
+ validate();
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields of type Fields
+ * @param timeStamp of type Long
+ * @param familyNames of type String[]
+ * @param valueFields of type Fields[]
+ */
+ public HBaseScheme(Fields keyFields, long timeStamp, String[] familyNames, Fields[] valueFields) {
+ this.keyField = keyFields;
+ this.timeStamp = timeStamp;
+ this.familyNames = familyNames;
+ this.valueFields = valueFields;
+
+ setSourceSink(this.keyField, this.valueFields);
+
+ validate();
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names
+ *
+ * @param keyField of type String
+ * @param valueFields of type Fields
+ */
+ public HBaseScheme(Fields keyField, Fields valueFields) {
+ this(keyField, Fields.fields(valueFields));
+ }
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names
+ *
+ * @param keyField of type Field
+ * @param valueFields of type Field[]
+ */
+ public HBaseScheme(Fields keyField, Fields[] valueFields) {
+ this.keyField = keyField;
+ this.valueFields = valueFields;
+ this.timeStamp = System.currentTimeMillis();
+
+ validate();
+
+ setSourceSink(this.keyField, this.valueFields);
+ }
+
+ private void validate() {
+ if (keyField.size() != 1) {
+ throw new IllegalArgumentException("may only have one key field, found: " + keyField.print());
+ }
+ }
+
+ private void setSourceSink(Fields keyFields, Fields[] columnFields) {
+ Fields allFields = keyFields;
+
+ if (columnFields.length != 0) {
+ allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend
+ }
+
+ setSourceFields(allFields);
+ setSinkFields(allFields);
+ }
+
+ /**
+ * Method getFamilyNames returns the set of familyNames of this HBaseScheme object.
+ *
+ * @return the familyNames (type String[]) of this HBaseScheme object.
+ */
+ public String[] getFamilyNames() {
+ HashSet<String> familyNameSet = new HashSet<String>();
+
+ if (familyNames == null) {
+ for (String columnName : columns(null, this.valueFields)) {
+ int pos = columnName.indexOf(":");
+ familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName));
+ }
+ } else {
+ for (String familyName : familyNames) {
+ familyNameSet.add(familyName);
+ }
+ }
+ return familyNameSet.toArray(new String[0]);
+ }
+
+ @Override
+ public void sourcePrepare(FlowProcess<JobConf> flowProcess,
+ SourceCall<Object[], RecordReader> sourceCall) {
+ Object[] pair =
+ new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()};
+
+ sourceCall.setContext(pair);
+ }
+
+ @Override
+ public void sourceCleanup(FlowProcess<JobConf> flowProcess,
+ SourceCall<Object[], RecordReader> sourceCall) {
+ sourceCall.setContext(null);
+ }
+
+ @Override
+ public boolean source(FlowProcess<JobConf> flowProcess,
+ SourceCall<Object[], RecordReader> sourceCall) throws IOException {
+ Tuple result = new Tuple();
+
+ Object key = sourceCall.getContext()[0];
+ Object value = sourceCall.getContext()[1];
+ boolean hasNext = sourceCall.getInput().next(key, value);
+ if (!hasNext) { return false; }
+
+ // Skip nulls
+ if (key == null || value == null) { return true; }
+
+ ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+ Result row = (Result) value;
+ result.add(keyWritable);
+
+ for (int i = 0; i < this.familyNames.length; i++) {
+ String familyName = this.familyNames[i];
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+ Fields fields = this.valueFields[i];
+ for (int k = 0; k < fields.size(); k++) {
+ String fieldName = (String) fields.get(k);
+ byte[] fieldNameBytes = Bytes.toBytes(fieldName);
+ byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes);
+ if (cellValue == null) {
+ cellValue = new byte[0];
+ }
+ result.add(new ImmutableBytesWritable(cellValue));
+ }
+ }
+
+ sourceCall.getIncomingEntry().setTuple(result);
+
+ return true;
+ }
+
+ @Override
+ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
+ throws IOException {
+ TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+ OutputCollector outputCollector = sinkCall.getOutput();
+ Tuple key = tupleEntry.selectTuple(keyField);
+ ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0);
+ Put put = new Put(keyBytes.get(), this.timeStamp);
+
+ for (int i = 0; i < valueFields.length; i++) {
+ Fields fieldSelector = valueFields[i];
+ TupleEntry values = tupleEntry.selectEntry(fieldSelector);
+
+ for (int j = 0; j < values.getFields().size(); j++) {
+ Fields fields = values.getFields();
+ Tuple tuple = values.getTuple();
+
+ ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
+ put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
+ }
+ }
+
+ outputCollector.collect(null, put);
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+ conf.setOutputFormat(TableOutputFormat.class);
+
+ conf.setOutputKeyClass(ImmutableBytesWritable.class);
+ conf.setOutputValueClass(Put.class);
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+ conf.setInputFormat(HBaseInputFormat.class);
+
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
+ conf.set(HBaseInputFormat.COLUMN_LIST, columns);
+ }
+
+ private String getColumns() {
+ return Util.join(columns(this.familyNames, this.valueFields), " ");
+ }
+
+ private String[] columns(String[] familyNames, Fields[] fieldsArray) {
+ if (columns != null) { return columns; }
+
+ int size = 0;
+
+ for (Fields fields : fieldsArray) { size += fields.size(); }
+
+ columns = new String[size];
+
+ int count = 0;
+
+ for (int i = 0; i < fieldsArray.length; i++) {
+ Fields fields = fieldsArray[i];
+
+ for (int j = 0; j < fields.size(); j++) {
+ if (familyNames == null) { columns[count++] = hbaseColumn((String) fields.get(j)); } else {
+ columns[count++] = hbaseColumn(familyNames[i]) + (String) fields.get(j);
+ }
+ }
+ }
+
+ return columns;
+ }
+
+ private String hbaseColumn(String column) {
+ if (column.indexOf(":") < 0) { return column + ":"; }
+
+ return column;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) { return true; }
+ if (object == null || getClass() != object.getClass()) { return false; }
+ if (!super.equals(object)) { return false; }
+
+ HBaseScheme that = (HBaseScheme) object;
+
+ if (!Arrays.equals(familyNames, that.familyNames)) { return false; }
+ if (keyField != null ? !keyField.equals(that.keyField) : that.keyField != null) {
+ return false;
+ }
+ if (!Arrays.equals(valueFields, that.valueFields)) { return false; }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (keyField != null ? keyField.hashCode() : 0);
+ result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
+ result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0);
+ return result;
+ }
+}