aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java572
1 files changed, 286 insertions, 286 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
index 32730d6..7dba40d 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -1,286 +1,286 @@
-/*
- * Copyright (c) 2009 Concurrent, Inc.
- *
- * This work has been released into the public domain
- * by the copyright holder. This applies worldwide.
- *
- * In case this is not legally possible:
- * The copyright holder grants any entity the right
- * to use this work for any purpose, without any
- * conditions, unless such conditions are required by law.
- */
-
-package parallelai.spyglass.hbase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
-import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
-
-import cascading.flow.FlowProcess;
-import cascading.scheme.Scheme;
-import cascading.scheme.SinkCall;
-import cascading.scheme.SourceCall;
-import cascading.tap.Tap;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import cascading.util.Util;
-
-/**
- * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
- * with the {@HBaseRawTap} to allow for the reading and writing of data
- * to and from a HBase cluster.
- *
- * @see HBaseRawTap
- */
-@SuppressWarnings({ "rawtypes", "deprecation" })
-public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
- /**
- *
- */
- private static final long serialVersionUID = 6248976486883281356L;
-
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
-
- public final Fields RowKeyField = new Fields("rowkey");
- public final Fields RowField = new Fields("row");
-
- /** String familyNames */
- private String[] familyNames;
-
- private boolean writeNulls = true;
-
- /**
- * Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyFields
- * of type Fields
- * @param familyName
- * of type String
- * @param valueFields
- * of type Fields
- */
- public HBaseRawScheme(String familyName) {
- this(new String[] { familyName });
- }
-
- public HBaseRawScheme(String[] familyNames) {
- this.familyNames = familyNames;
- setSourceFields();
- }
-
- public HBaseRawScheme(String familyName, boolean writeNulls) {
- this(new String[] { familyName }, writeNulls);
- }
-
- public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
- this.familyNames = familyNames;
- this.writeNulls = writeNulls;
- setSourceFields();
- }
-
- private void setSourceFields() {
- Fields sourceFields = Fields.join(RowKeyField, RowField);
- setSourceFields(sourceFields);
- }
-
- /**
- * Method getFamilyNames returns the set of familyNames of this HBaseScheme
- * object.
- *
- * @return the familyNames (type String[]) of this HBaseScheme object.
- */
- public String[] getFamilyNames() {
- HashSet<String> familyNameSet = new HashSet<String>();
- if (familyNames != null) {
- for (String familyName : familyNames) {
- familyNameSet.add(familyName);
- }
- }
- return familyNameSet.toArray(new String[0]);
- }
-
- @Override
- public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
- Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
-
- sourceCall.setContext(pair);
- }
-
- @Override
- public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
- sourceCall.setContext(null);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
- throws IOException {
- Tuple result = new Tuple();
-
- Object key = sourceCall.getContext()[0];
- Object value = sourceCall.getContext()[1];
- boolean hasNext = sourceCall.getInput().next(key, value);
- if (!hasNext) {
- return false;
- }
-
- // Skip nulls
- if (key == null || value == null) {
- return true;
- }
-
- ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
- Result row = (Result) value;
- result.add(keyWritable);
- result.add(row);
- sourceCall.getIncomingEntry().setTuple(result);
- return true;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
- TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
- OutputCollector outputCollector = sinkCall.getOutput();
- Tuple key = tupleEntry.selectTuple(RowKeyField);
- Object okey = key.getObject(0);
- ImmutableBytesWritable keyBytes = getBytes(okey);
- Put put = new Put(keyBytes.get());
- Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
- if (null != outFields) {
- TupleEntry values = tupleEntry.selectEntry(outFields);
- for (int n = 0; n < values.getFields().size(); n++) {
- Object o = values.get(n);
- ImmutableBytesWritable valueBytes = getBytes(o);
- Comparable field = outFields.get(n);
- ColumnName cn = parseColumn((String) field);
- if (null == cn.family) {
- if (n >= familyNames.length)
- cn.family = familyNames[familyNames.length - 1];
- else
- cn.family = familyNames[n];
- }
- if (null != o || writeNulls)
- put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
- }
- }
- outputCollector.collect(null, put);
- }
-
- private ImmutableBytesWritable getBytes(Object obj) {
- if (null == obj)
- return new ImmutableBytesWritable(new byte[0]);
- if (obj instanceof ImmutableBytesWritable)
- return (ImmutableBytesWritable) obj;
- else if (obj instanceof String)
- return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
- else if (obj instanceof Long)
- return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
- else if (obj instanceof Integer)
- return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
- else if (obj instanceof Short)
- return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
- else if (obj instanceof Boolean)
- return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
- else if (obj instanceof Double)
- return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
- else
- throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
- + obj.getClass().getName());
- }
-
- private ColumnName parseColumn(String column) {
- ColumnName ret = new ColumnName();
- int pos = column.indexOf(":");
- if (pos > 0) {
- ret.name = column.substring(pos + 1);
- ret.family = column.substring(0, pos);
- } else {
- ret.name = column;
- }
- return ret;
- }
-
- private class ColumnName {
- String family;
- String name;
-
- ColumnName() {
- }
- }
-
- @Override
- public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
- conf.setOutputFormat(TableOutputFormat.class);
- conf.setOutputKeyClass(ImmutableBytesWritable.class);
- conf.setOutputValueClass(Put.class);
- }
-
- @Override
- public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
- JobConf conf) {
- DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
- ValueCopier.class);
- if (null != familyNames) {
- String columns = Util.join(this.familyNames, " ");
- LOG.debug("sourcing from column families: {}", columns);
- conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
- }
- }
-
- @Override
- public boolean equals(Object object) {
- if (this == object) {
- return true;
- }
- if (object == null || getClass() != object.getClass()) {
- return false;
- }
- if (!super.equals(object)) {
- return false;
- }
-
- HBaseRawScheme that = (HBaseRawScheme) object;
-
- if (!Arrays.equals(familyNames, that.familyNames)) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
- return result;
- }
-
- public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
-
- public ValueCopier() {
- }
-
- public void copyValue(Result oldValue, Result newValue) {
- if (null != oldValue && null != newValue) {
- oldValue.copyFrom(newValue);
- }
- }
-
- }
-}
+///*
+//* Copyright (c) 2009 Concurrent, Inc.
+//*
+//* This work has been released into the public domain
+//* by the copyright holder. This applies worldwide.
+//*
+//* In case this is not legally possible:
+//* The copyright holder grants any entity the right
+//* to use this work for any purpose, without any
+//* conditions, unless such conditions are required by law.
+//*/
+//
+//package parallelai.spyglass.hbase;
+//
+//import java.io.IOException;
+//import java.util.Arrays;
+//import java.util.HashSet;
+//
+//import org.apache.hadoop.hbase.client.Put;
+//import org.apache.hadoop.hbase.client.Result;
+//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+//import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+//import org.apache.hadoop.hbase.util.Bytes;
+//import org.apache.hadoop.mapred.JobConf;
+//import org.apache.hadoop.mapred.OutputCollector;
+//import org.apache.hadoop.mapred.RecordReader;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
+//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
+//
+//import cascading.flow.FlowProcess;
+//import cascading.scheme.Scheme;
+//import cascading.scheme.SinkCall;
+//import cascading.scheme.SourceCall;
+//import cascading.tap.Tap;
+//import cascading.tuple.Fields;
+//import cascading.tuple.Tuple;
+//import cascading.tuple.TupleEntry;
+//import cascading.util.Util;
+//
+///**
+//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
+//* with the {@HBaseRawTap} to allow for the reading and writing of data
+//* to and from a HBase cluster.
+//*
+//* @see HBaseRawTap
+//*/
+//@SuppressWarnings({ "rawtypes", "deprecation" })
+//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+// /**
+// *
+// */
+// private static final long serialVersionUID = 6248976486883281356L;
+//
+// /** Field LOG */
+// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
+//
+// public final Fields RowKeyField = new Fields("rowkey");
+// public final Fields RowField = new Fields("row");
+//
+// /** String familyNames */
+// private String[] familyNames;
+//
+// private boolean writeNulls = true;
+//
+// /**
+// * Constructor HBaseScheme creates a new HBaseScheme instance.
+// *
+// * @param keyFields
+// * of type Fields
+// * @param familyName
+// * of type String
+// * @param valueFields
+// * of type Fields
+// */
+// public HBaseRawScheme(String familyName) {
+// this(new String[] { familyName });
+// }
+//
+// public HBaseRawScheme(String[] familyNames) {
+// this.familyNames = familyNames;
+// setSourceFields();
+// }
+//
+// public HBaseRawScheme(String familyName, boolean writeNulls) {
+// this(new String[] { familyName }, writeNulls);
+// }
+//
+// public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
+// this.familyNames = familyNames;
+// this.writeNulls = writeNulls;
+// setSourceFields();
+// }
+//
+// private void setSourceFields() {
+// Fields sourceFields = Fields.join(RowKeyField, RowField);
+// setSourceFields(sourceFields);
+// }
+//
+// /**
+// * Method getFamilyNames returns the set of familyNames of this HBaseScheme
+// * object.
+// *
+// * @return the familyNames (type String[]) of this HBaseScheme object.
+// */
+// public String[] getFamilyNames() {
+// HashSet<String> familyNameSet = new HashSet<String>();
+// if (familyNames != null) {
+// for (String familyName : familyNames) {
+// familyNameSet.add(familyName);
+// }
+// }
+// return familyNameSet.toArray(new String[0]);
+// }
+//
+// @Override
+// public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+// Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
+//
+// sourceCall.setContext(pair);
+// }
+//
+// @Override
+// public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+// sourceCall.setContext(null);
+// }
+//
+// @SuppressWarnings("unchecked")
+// @Override
+// public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
+// throws IOException {
+// Tuple result = new Tuple();
+//
+// Object key = sourceCall.getContext()[0];
+// Object value = sourceCall.getContext()[1];
+// boolean hasNext = sourceCall.getInput().next(key, value);
+// if (!hasNext) {
+// return false;
+// }
+//
+// // Skip nulls
+// if (key == null || value == null) {
+// return true;
+// }
+//
+// ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+// Result row = (Result) value;
+// result.add(keyWritable);
+// result.add(row);
+// sourceCall.getIncomingEntry().setTuple(result);
+// return true;
+// }
+//
+// @SuppressWarnings("unchecked")
+// @Override
+// public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
+// TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+// OutputCollector outputCollector = sinkCall.getOutput();
+// Tuple key = tupleEntry.selectTuple(RowKeyField);
+// Object okey = key.getObject(0);
+// ImmutableBytesWritable keyBytes = getBytes(okey);
+// Put put = new Put(keyBytes.get());
+// Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
+// if (null != outFields) {
+// TupleEntry values = tupleEntry.selectEntry(outFields);
+// for (int n = 0; n < values.getFields().size(); n++) {
+// Object o = values.get(n);
+// ImmutableBytesWritable valueBytes = getBytes(o);
+// Comparable field = outFields.get(n);
+// ColumnName cn = parseColumn((String) field);
+// if (null == cn.family) {
+// if (n >= familyNames.length)
+// cn.family = familyNames[familyNames.length - 1];
+// else
+// cn.family = familyNames[n];
+// }
+// if (null != o || writeNulls)
+// put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
+// }
+// }
+// outputCollector.collect(null, put);
+// }
+//
+// private ImmutableBytesWritable getBytes(Object obj) {
+// if (null == obj)
+// return new ImmutableBytesWritable(new byte[0]);
+// if (obj instanceof ImmutableBytesWritable)
+// return (ImmutableBytesWritable) obj;
+// else if (obj instanceof String)
+// return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
+// else if (obj instanceof Long)
+// return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
+// else if (obj instanceof Integer)
+// return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
+// else if (obj instanceof Short)
+// return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
+// else if (obj instanceof Boolean)
+// return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
+// else if (obj instanceof Double)
+// return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
+// else
+// throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
+// + obj.getClass().getName());
+// }
+//
+// private ColumnName parseColumn(String column) {
+// ColumnName ret = new ColumnName();
+// int pos = column.indexOf(":");
+// if (pos > 0) {
+// ret.name = column.substring(pos + 1);
+// ret.family = column.substring(0, pos);
+// } else {
+// ret.name = column;
+// }
+// return ret;
+// }
+//
+// private class ColumnName {
+// String family;
+// String name;
+//
+// ColumnName() {
+// }
+// }
+//
+// @Override
+// public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+// conf.setOutputFormat(TableOutputFormat.class);
+// conf.setOutputKeyClass(ImmutableBytesWritable.class);
+// conf.setOutputValueClass(Put.class);
+// }
+//
+// @Override
+// public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
+// JobConf conf) {
+// DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
+// ValueCopier.class);
+// if (null != familyNames) {
+// String columns = Util.join(this.familyNames, " ");
+// LOG.debug("sourcing from column families: {}", columns);
+// conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
+// }
+// }
+//
+// @Override
+// public boolean equals(Object object) {
+// if (this == object) {
+// return true;
+// }
+// if (object == null || getClass() != object.getClass()) {
+// return false;
+// }
+// if (!super.equals(object)) {
+// return false;
+// }
+//
+// HBaseRawScheme that = (HBaseRawScheme) object;
+//
+// if (!Arrays.equals(familyNames, that.familyNames)) {
+// return false;
+// }
+// return true;
+// }
+//
+// @Override
+// public int hashCode() {
+// int result = super.hashCode();
+// result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
+// return result;
+// }
+//
+// public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
+//
+// public ValueCopier() {
+// }
+//
+// public void copyValue(Result oldValue, Result newValue) {
+// if (null != oldValue && null != newValue) {
+// oldValue.copyFrom(newValue);
+// }
+// }
+//
+// }
+//}