diff options
author | Gracia Fernandez <Gracia.FernandezLopez@bskyb.com> | 2013-07-10 10:26:41 +0100 |
---|---|---|
committer | Gracia Fernandez <Gracia.FernandezLopez@bskyb.com> | 2013-07-10 10:26:41 +0100 |
commit | df2fa37f337bbbb219449aadaf57bcacd2350ada (patch) | |
tree | 12133be82c0cc80af58dc06eda43fc671725c9ee /src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | |
parent | 20a18b4388f0cd06bec0b43d083150f6e1bb2c5e (diff) | |
download | SpyGlass-df2fa37f337bbbb219449aadaf57bcacd2350ada.tar.gz SpyGlass-df2fa37f337bbbb219449aadaf57bcacd2350ada.zip |
Versions reverted back to the old ones: Scala 2.9.3 (cdh4.2.0)
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java | 572 |
1 files changed, 286 insertions, 286 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 32730d6..7dba40d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -1,286 +1,286 @@ -/* - * Copyright (c) 2009 Concurrent, Inc. - * - * This work has been released into the public domain - * by the copyright holder. This applies worldwide. - * - * In case this is not legally possible: - * The copyright holder grants any entity the right - * to use this work for any purpose, without any - * conditions, unless such conditions are required by law. - */ - -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; - -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; -import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; - -import cascading.flow.FlowProcess; -import cascading.scheme.Scheme; -import cascading.scheme.SinkCall; -import cascading.scheme.SourceCall; -import cascading.tap.Tap; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import cascading.util.Util; - -/** - * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction - * with the {@HBaseRawTap} to allow for the reading and writing of data - * to and from a HBase cluster. - * - * @see HBaseRawTap - */ -@SuppressWarnings({ "rawtypes", "deprecation" }) -public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { - /** - * - */ - private static final long serialVersionUID = 6248976486883281356L; - - /** Field LOG */ - private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); - - public final Fields RowKeyField = new Fields("rowkey"); - public final Fields RowField = new Fields("row"); - - /** String familyNames */ - private String[] familyNames; - - private boolean writeNulls = true; - - /** - * Constructor HBaseScheme creates a new HBaseScheme instance. - * - * @param keyFields - * of type Fields - * @param familyName - * of type String - * @param valueFields - * of type Fields - */ - public HBaseRawScheme(String familyName) { - this(new String[] { familyName }); - } - - public HBaseRawScheme(String[] familyNames) { - this.familyNames = familyNames; - setSourceFields(); - } - - public HBaseRawScheme(String familyName, boolean writeNulls) { - this(new String[] { familyName }, writeNulls); - } - - public HBaseRawScheme(String[] familyNames, boolean writeNulls) { - this.familyNames = familyNames; - this.writeNulls = writeNulls; - setSourceFields(); - } - - private void setSourceFields() { - Fields sourceFields = Fields.join(RowKeyField, RowField); - setSourceFields(sourceFields); - } - - /** - * Method getFamilyNames returns the set of familyNames of this HBaseScheme - * object. - * - * @return the familyNames (type String[]) of this HBaseScheme object. - */ - public String[] getFamilyNames() { - HashSet<String> familyNameSet = new HashSet<String>(); - if (familyNames != null) { - for (String familyName : familyNames) { - familyNameSet.add(familyName); - } - } - return familyNameSet.toArray(new String[0]); - } - - @Override - public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { - Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; - - sourceCall.setContext(pair); - } - - @Override - public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { - sourceCall.setContext(null); - } - - @SuppressWarnings("unchecked") - @Override - public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) - throws IOException { - Tuple result = new Tuple(); - - Object key = sourceCall.getContext()[0]; - Object value = sourceCall.getContext()[1]; - boolean hasNext = sourceCall.getInput().next(key, value); - if (!hasNext) { - return false; - } - - // Skip nulls - if (key == null || value == null) { - return true; - } - - ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; - Result row = (Result) value; - result.add(keyWritable); - result.add(row); - sourceCall.getIncomingEntry().setTuple(result); - return true; - } - - @SuppressWarnings("unchecked") - @Override - public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { - TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); - OutputCollector outputCollector = sinkCall.getOutput(); - Tuple key = tupleEntry.selectTuple(RowKeyField); - Object okey = key.getObject(0); - ImmutableBytesWritable keyBytes = getBytes(okey); - Put put = new Put(keyBytes.get()); - Fields outFields = tupleEntry.getFields().subtract(RowKeyField); - if (null != outFields) { - TupleEntry values = tupleEntry.selectEntry(outFields); - for (int n = 0; n < values.getFields().size(); n++) { - Object o = values.get(n); - ImmutableBytesWritable valueBytes = getBytes(o); - Comparable field = outFields.get(n); - ColumnName cn = parseColumn((String) field); - if (null == cn.family) { - if (n >= familyNames.length) - cn.family = familyNames[familyNames.length - 1]; - else - cn.family = familyNames[n]; - } - if (null != o || writeNulls) - put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); - } - } - outputCollector.collect(null, put); - } - - private ImmutableBytesWritable getBytes(Object obj) { - if (null == obj) - return new ImmutableBytesWritable(new byte[0]); - if (obj instanceof ImmutableBytesWritable) - return (ImmutableBytesWritable) obj; - else if (obj instanceof String) - return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); - else if (obj instanceof Long) - return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); - else if (obj instanceof Integer) - return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); - else if (obj instanceof Short) - return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); - else if (obj instanceof Boolean) - return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); - else if (obj instanceof Double) - return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); - else - throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" - + obj.getClass().getName()); - } - - private ColumnName parseColumn(String column) { - ColumnName ret = new ColumnName(); - int pos = column.indexOf(":"); - if (pos > 0) { - ret.name = column.substring(pos + 1); - ret.family = column.substring(0, pos); - } else { - ret.name = column; - } - return ret; - } - - private class ColumnName { - String family; - String name; - - ColumnName() { - } - } - - @Override - public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { - conf.setOutputFormat(TableOutputFormat.class); - conf.setOutputKeyClass(ImmutableBytesWritable.class); - conf.setOutputValueClass(Put.class); - } - - @Override - public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, - JobConf conf) { - DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, - ValueCopier.class); - if (null != familyNames) { - String columns = Util.join(this.familyNames, " "); - LOG.debug("sourcing from column families: {}", columns); - conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); - } - } - - @Override - public boolean equals(Object object) { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - if (!super.equals(object)) { - return false; - } - - HBaseRawScheme that = (HBaseRawScheme) object; - - if (!Arrays.equals(familyNames, that.familyNames)) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); - return result; - } - - public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { - - public ValueCopier() { - } - - public void copyValue(Result oldValue, Result newValue) { - if (null != oldValue && null != newValue) { - oldValue.copyFrom(newValue); - } - } - - } -} +///* +//* Copyright (c) 2009 Concurrent, Inc. +//* +//* This work has been released into the public domain +//* by the copyright holder. This applies worldwide. +//* +//* In case this is not legally possible: +//* The copyright holder grants any entity the right +//* to use this work for any purpose, without any +//* conditions, unless such conditions are required by law. +//*/ +// +//package parallelai.spyglass.hbase; +// +//import java.io.IOException; +//import java.util.Arrays; +//import java.util.HashSet; +// +//import org.apache.hadoop.hbase.client.Put; +//import org.apache.hadoop.hbase.client.Result; +//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +//import org.apache.hadoop.hbase.mapred.TableOutputFormat; +//import org.apache.hadoop.hbase.util.Bytes; +//import org.apache.hadoop.mapred.JobConf; +//import org.apache.hadoop.mapred.OutputCollector; +//import org.apache.hadoop.mapred.RecordReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; +// +//import cascading.flow.FlowProcess; +//import cascading.scheme.Scheme; +//import cascading.scheme.SinkCall; +//import cascading.scheme.SourceCall; +//import cascading.tap.Tap; +//import cascading.tuple.Fields; +//import cascading.tuple.Tuple; +//import cascading.tuple.TupleEntry; +//import cascading.util.Util; +// +///** +//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction +//* with the {@HBaseRawTap} to allow for the reading and writing of data +//* to and from a HBase cluster. +//* +//* @see HBaseRawTap +//*/ +//@SuppressWarnings({ "rawtypes", "deprecation" }) +//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +// /** +// * +// */ +// private static final long serialVersionUID = 6248976486883281356L; +// +// /** Field LOG */ +// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); +// +// public final Fields RowKeyField = new Fields("rowkey"); +// public final Fields RowField = new Fields("row"); +// +// /** String familyNames */ +// private String[] familyNames; +// +// private boolean writeNulls = true; +// +// /** +// * Constructor HBaseScheme creates a new HBaseScheme instance. +// * +// * @param keyFields +// * of type Fields +// * @param familyName +// * of type String +// * @param valueFields +// * of type Fields +// */ +// public HBaseRawScheme(String familyName) { +// this(new String[] { familyName }); +// } +// +// public HBaseRawScheme(String[] familyNames) { +// this.familyNames = familyNames; +// setSourceFields(); +// } +// +// public HBaseRawScheme(String familyName, boolean writeNulls) { +// this(new String[] { familyName }, writeNulls); +// } +// +// public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +// this.familyNames = familyNames; +// this.writeNulls = writeNulls; +// setSourceFields(); +// } +// +// private void setSourceFields() { +// Fields sourceFields = Fields.join(RowKeyField, RowField); +// setSourceFields(sourceFields); +// } +// +// /** +// * Method getFamilyNames returns the set of familyNames of this HBaseScheme +// * object. +// * +// * @return the familyNames (type String[]) of this HBaseScheme object. +// */ +// public String[] getFamilyNames() { +// HashSet<String> familyNameSet = new HashSet<String>(); +// if (familyNames != null) { +// for (String familyName : familyNames) { +// familyNameSet.add(familyName); +// } +// } +// return familyNameSet.toArray(new String[0]); +// } +// +// @Override +// public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +// Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; +// +// sourceCall.setContext(pair); +// } +// +// @Override +// public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { +// sourceCall.setContext(null); +// } +// +// @SuppressWarnings("unchecked") +// @Override +// public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) +// throws IOException { +// Tuple result = new Tuple(); +// +// Object key = sourceCall.getContext()[0]; +// Object value = sourceCall.getContext()[1]; +// boolean hasNext = sourceCall.getInput().next(key, value); +// if (!hasNext) { +// return false; +// } +// +// // Skip nulls +// if (key == null || value == null) { +// return true; +// } +// +// ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +// Result row = (Result) value; +// result.add(keyWritable); +// result.add(row); +// sourceCall.getIncomingEntry().setTuple(result); +// return true; +// } +// +// @SuppressWarnings("unchecked") +// @Override +// public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { +// TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +// OutputCollector outputCollector = sinkCall.getOutput(); +// Tuple key = tupleEntry.selectTuple(RowKeyField); +// Object okey = key.getObject(0); +// ImmutableBytesWritable keyBytes = getBytes(okey); +// Put put = new Put(keyBytes.get()); +// Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +// if (null != outFields) { +// TupleEntry values = tupleEntry.selectEntry(outFields); +// for (int n = 0; n < values.getFields().size(); n++) { +// Object o = values.get(n); +// ImmutableBytesWritable valueBytes = getBytes(o); +// Comparable field = outFields.get(n); +// ColumnName cn = parseColumn((String) field); +// if (null == cn.family) { +// if (n >= familyNames.length) +// cn.family = familyNames[familyNames.length - 1]; +// else +// cn.family = familyNames[n]; +// } +// if (null != o || writeNulls) +// put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +// } +// } +// outputCollector.collect(null, put); +// } +// +// private ImmutableBytesWritable getBytes(Object obj) { +// if (null == obj) +// return new ImmutableBytesWritable(new byte[0]); +// if (obj instanceof ImmutableBytesWritable) +// return (ImmutableBytesWritable) obj; +// else if (obj instanceof String) +// return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +// else if (obj instanceof Long) +// return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +// else if (obj instanceof Integer) +// return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +// else if (obj instanceof Short) +// return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +// else if (obj instanceof Boolean) +// return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +// else if (obj instanceof Double) +// return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +// else +// throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +// + obj.getClass().getName()); +// } +// +// private ColumnName parseColumn(String column) { +// ColumnName ret = new ColumnName(); +// int pos = column.indexOf(":"); +// if (pos > 0) { +// ret.name = column.substring(pos + 1); +// ret.family = column.substring(0, pos); +// } else { +// ret.name = column; +// } +// return ret; +// } +// +// private class ColumnName { +// String family; +// String name; +// +// ColumnName() { +// } +// } +// +// @Override +// public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +// conf.setOutputFormat(TableOutputFormat.class); +// conf.setOutputKeyClass(ImmutableBytesWritable.class); +// conf.setOutputValueClass(Put.class); +// } +// +// @Override +// public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +// JobConf conf) { +// DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +// ValueCopier.class); +// if (null != familyNames) { +// String columns = Util.join(this.familyNames, " "); +// LOG.debug("sourcing from column families: {}", columns); +// conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +// } +// } +// +// @Override +// public boolean equals(Object object) { +// if (this == object) { +// return true; +// } +// if (object == null || getClass() != object.getClass()) { +// return false; +// } +// if (!super.equals(object)) { +// return false; +// } +// +// HBaseRawScheme that = (HBaseRawScheme) object; +// +// if (!Arrays.equals(familyNames, that.familyNames)) { +// return false; +// } +// return true; +// } +// +// @Override +// public int hashCode() { +// int result = super.hashCode(); +// result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +// return result; +// } +// +// public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { +// +// public ValueCopier() { +// } +// +// public void copyValue(Result oldValue, Result newValue) { +// if (null != oldValue && null != newValue) { +// oldValue.copyFrom(newValue); +// } +// } +// +// } +//} |