aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorGracia Fernandez <Gracia.FernandezLopez@bskyb.com>2013-07-10 10:26:41 +0100
committerGracia Fernandez <Gracia.FernandezLopez@bskyb.com>2013-07-10 10:26:41 +0100
commitdf2fa37f337bbbb219449aadaf57bcacd2350ada (patch)
tree12133be82c0cc80af58dc06eda43fc671725c9ee /src
parent20a18b4388f0cd06bec0b43d083150f6e1bb2c5e (diff)
downloadSpyGlass-df2fa37f337bbbb219449aadaf57bcacd2350ada.tar.gz
SpyGlass-df2fa37f337bbbb219449aadaf57bcacd2350ada.zip
Versions reverted back to the old ones: Scala 2.9.3 (cdh4.2.0)
Diffstat (limited to 'src')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java572
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java622
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala166
3 files changed, 680 insertions, 680 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
index 32730d6..7dba40d 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -1,286 +1,286 @@
-/*
- * Copyright (c) 2009 Concurrent, Inc.
- *
- * This work has been released into the public domain
- * by the copyright holder. This applies worldwide.
- *
- * In case this is not legally possible:
- * The copyright holder grants any entity the right
- * to use this work for any purpose, without any
- * conditions, unless such conditions are required by law.
- */
-
-package parallelai.spyglass.hbase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
-import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
-
-import cascading.flow.FlowProcess;
-import cascading.scheme.Scheme;
-import cascading.scheme.SinkCall;
-import cascading.scheme.SourceCall;
-import cascading.tap.Tap;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import cascading.util.Util;
-
-/**
- * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
- * with the {@HBaseRawTap} to allow for the reading and writing of data
- * to and from a HBase cluster.
- *
- * @see HBaseRawTap
- */
-@SuppressWarnings({ "rawtypes", "deprecation" })
-public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
- /**
- *
- */
- private static final long serialVersionUID = 6248976486883281356L;
-
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
-
- public final Fields RowKeyField = new Fields("rowkey");
- public final Fields RowField = new Fields("row");
-
- /** String familyNames */
- private String[] familyNames;
-
- private boolean writeNulls = true;
-
- /**
- * Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyFields
- * of type Fields
- * @param familyName
- * of type String
- * @param valueFields
- * of type Fields
- */
- public HBaseRawScheme(String familyName) {
- this(new String[] { familyName });
- }
-
- public HBaseRawScheme(String[] familyNames) {
- this.familyNames = familyNames;
- setSourceFields();
- }
-
- public HBaseRawScheme(String familyName, boolean writeNulls) {
- this(new String[] { familyName }, writeNulls);
- }
-
- public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
- this.familyNames = familyNames;
- this.writeNulls = writeNulls;
- setSourceFields();
- }
-
- private void setSourceFields() {
- Fields sourceFields = Fields.join(RowKeyField, RowField);
- setSourceFields(sourceFields);
- }
-
- /**
- * Method getFamilyNames returns the set of familyNames of this HBaseScheme
- * object.
- *
- * @return the familyNames (type String[]) of this HBaseScheme object.
- */
- public String[] getFamilyNames() {
- HashSet<String> familyNameSet = new HashSet<String>();
- if (familyNames != null) {
- for (String familyName : familyNames) {
- familyNameSet.add(familyName);
- }
- }
- return familyNameSet.toArray(new String[0]);
- }
-
- @Override
- public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
- Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
-
- sourceCall.setContext(pair);
- }
-
- @Override
- public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
- sourceCall.setContext(null);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
- throws IOException {
- Tuple result = new Tuple();
-
- Object key = sourceCall.getContext()[0];
- Object value = sourceCall.getContext()[1];
- boolean hasNext = sourceCall.getInput().next(key, value);
- if (!hasNext) {
- return false;
- }
-
- // Skip nulls
- if (key == null || value == null) {
- return true;
- }
-
- ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
- Result row = (Result) value;
- result.add(keyWritable);
- result.add(row);
- sourceCall.getIncomingEntry().setTuple(result);
- return true;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
- TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
- OutputCollector outputCollector = sinkCall.getOutput();
- Tuple key = tupleEntry.selectTuple(RowKeyField);
- Object okey = key.getObject(0);
- ImmutableBytesWritable keyBytes = getBytes(okey);
- Put put = new Put(keyBytes.get());
- Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
- if (null != outFields) {
- TupleEntry values = tupleEntry.selectEntry(outFields);
- for (int n = 0; n < values.getFields().size(); n++) {
- Object o = values.get(n);
- ImmutableBytesWritable valueBytes = getBytes(o);
- Comparable field = outFields.get(n);
- ColumnName cn = parseColumn((String) field);
- if (null == cn.family) {
- if (n >= familyNames.length)
- cn.family = familyNames[familyNames.length - 1];
- else
- cn.family = familyNames[n];
- }
- if (null != o || writeNulls)
- put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
- }
- }
- outputCollector.collect(null, put);
- }
-
- private ImmutableBytesWritable getBytes(Object obj) {
- if (null == obj)
- return new ImmutableBytesWritable(new byte[0]);
- if (obj instanceof ImmutableBytesWritable)
- return (ImmutableBytesWritable) obj;
- else if (obj instanceof String)
- return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
- else if (obj instanceof Long)
- return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
- else if (obj instanceof Integer)
- return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
- else if (obj instanceof Short)
- return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
- else if (obj instanceof Boolean)
- return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
- else if (obj instanceof Double)
- return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
- else
- throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
- + obj.getClass().getName());
- }
-
- private ColumnName parseColumn(String column) {
- ColumnName ret = new ColumnName();
- int pos = column.indexOf(":");
- if (pos > 0) {
- ret.name = column.substring(pos + 1);
- ret.family = column.substring(0, pos);
- } else {
- ret.name = column;
- }
- return ret;
- }
-
- private class ColumnName {
- String family;
- String name;
-
- ColumnName() {
- }
- }
-
- @Override
- public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
- conf.setOutputFormat(TableOutputFormat.class);
- conf.setOutputKeyClass(ImmutableBytesWritable.class);
- conf.setOutputValueClass(Put.class);
- }
-
- @Override
- public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
- JobConf conf) {
- DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
- ValueCopier.class);
- if (null != familyNames) {
- String columns = Util.join(this.familyNames, " ");
- LOG.debug("sourcing from column families: {}", columns);
- conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
- }
- }
-
- @Override
- public boolean equals(Object object) {
- if (this == object) {
- return true;
- }
- if (object == null || getClass() != object.getClass()) {
- return false;
- }
- if (!super.equals(object)) {
- return false;
- }
-
- HBaseRawScheme that = (HBaseRawScheme) object;
-
- if (!Arrays.equals(familyNames, that.familyNames)) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
- return result;
- }
-
- public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
-
- public ValueCopier() {
- }
-
- public void copyValue(Result oldValue, Result newValue) {
- if (null != oldValue && null != newValue) {
- oldValue.copyFrom(newValue);
- }
- }
-
- }
-}
+///*
+//* Copyright (c) 2009 Concurrent, Inc.
+//*
+//* This work has been released into the public domain
+//* by the copyright holder. This applies worldwide.
+//*
+//* In case this is not legally possible:
+//* The copyright holder grants any entity the right
+//* to use this work for any purpose, without any
+//* conditions, unless such conditions are required by law.
+//*/
+//
+//package parallelai.spyglass.hbase;
+//
+//import java.io.IOException;
+//import java.util.Arrays;
+//import java.util.HashSet;
+//
+//import org.apache.hadoop.hbase.client.Put;
+//import org.apache.hadoop.hbase.client.Result;
+//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+//import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+//import org.apache.hadoop.hbase.util.Bytes;
+//import org.apache.hadoop.mapred.JobConf;
+//import org.apache.hadoop.mapred.OutputCollector;
+//import org.apache.hadoop.mapred.RecordReader;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
+//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
+//
+//import cascading.flow.FlowProcess;
+//import cascading.scheme.Scheme;
+//import cascading.scheme.SinkCall;
+//import cascading.scheme.SourceCall;
+//import cascading.tap.Tap;
+//import cascading.tuple.Fields;
+//import cascading.tuple.Tuple;
+//import cascading.tuple.TupleEntry;
+//import cascading.util.Util;
+//
+///**
+//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
+//* with the {@HBaseRawTap} to allow for the reading and writing of data
+//* to and from a HBase cluster.
+//*
+//* @see HBaseRawTap
+//*/
+//@SuppressWarnings({ "rawtypes", "deprecation" })
+//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+// /**
+// *
+// */
+// private static final long serialVersionUID = 6248976486883281356L;
+//
+// /** Field LOG */
+// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
+//
+// public final Fields RowKeyField = new Fields("rowkey");
+// public final Fields RowField = new Fields("row");
+//
+// /** String familyNames */
+// private String[] familyNames;
+//
+// private boolean writeNulls = true;
+//
+// /**
+// * Constructor HBaseScheme creates a new HBaseScheme instance.
+// *
+// * @param keyFields
+// * of type Fields
+// * @param familyName
+// * of type String
+// * @param valueFields
+// * of type Fields
+// */
+// public HBaseRawScheme(String familyName) {
+// this(new String[] { familyName });
+// }
+//
+// public HBaseRawScheme(String[] familyNames) {
+// this.familyNames = familyNames;
+// setSourceFields();
+// }
+//
+// public HBaseRawScheme(String familyName, boolean writeNulls) {
+// this(new String[] { familyName }, writeNulls);
+// }
+//
+// public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
+// this.familyNames = familyNames;
+// this.writeNulls = writeNulls;
+// setSourceFields();
+// }
+//
+// private void setSourceFields() {
+// Fields sourceFields = Fields.join(RowKeyField, RowField);
+// setSourceFields(sourceFields);
+// }
+//
+// /**
+// * Method getFamilyNames returns the set of familyNames of this HBaseScheme
+// * object.
+// *
+// * @return the familyNames (type String[]) of this HBaseScheme object.
+// */
+// public String[] getFamilyNames() {
+// HashSet<String> familyNameSet = new HashSet<String>();
+// if (familyNames != null) {
+// for (String familyName : familyNames) {
+// familyNameSet.add(familyName);
+// }
+// }
+// return familyNameSet.toArray(new String[0]);
+// }
+//
+// @Override
+// public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+// Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
+//
+// sourceCall.setContext(pair);
+// }
+//
+// @Override
+// public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+// sourceCall.setContext(null);
+// }
+//
+// @SuppressWarnings("unchecked")
+// @Override
+// public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
+// throws IOException {
+// Tuple result = new Tuple();
+//
+// Object key = sourceCall.getContext()[0];
+// Object value = sourceCall.getContext()[1];
+// boolean hasNext = sourceCall.getInput().next(key, value);
+// if (!hasNext) {
+// return false;
+// }
+//
+// // Skip nulls
+// if (key == null || value == null) {
+// return true;
+// }
+//
+// ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+// Result row = (Result) value;
+// result.add(keyWritable);
+// result.add(row);
+// sourceCall.getIncomingEntry().setTuple(result);
+// return true;
+// }
+//
+// @SuppressWarnings("unchecked")
+// @Override
+// public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
+// TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+// OutputCollector outputCollector = sinkCall.getOutput();
+// Tuple key = tupleEntry.selectTuple(RowKeyField);
+// Object okey = key.getObject(0);
+// ImmutableBytesWritable keyBytes = getBytes(okey);
+// Put put = new Put(keyBytes.get());
+// Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
+// if (null != outFields) {
+// TupleEntry values = tupleEntry.selectEntry(outFields);
+// for (int n = 0; n < values.getFields().size(); n++) {
+// Object o = values.get(n);
+// ImmutableBytesWritable valueBytes = getBytes(o);
+// Comparable field = outFields.get(n);
+// ColumnName cn = parseColumn((String) field);
+// if (null == cn.family) {
+// if (n >= familyNames.length)
+// cn.family = familyNames[familyNames.length - 1];
+// else
+// cn.family = familyNames[n];
+// }
+// if (null != o || writeNulls)
+// put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
+// }
+// }
+// outputCollector.collect(null, put);
+// }
+//
+// private ImmutableBytesWritable getBytes(Object obj) {
+// if (null == obj)
+// return new ImmutableBytesWritable(new byte[0]);
+// if (obj instanceof ImmutableBytesWritable)
+// return (ImmutableBytesWritable) obj;
+// else if (obj instanceof String)
+// return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
+// else if (obj instanceof Long)
+// return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
+// else if (obj instanceof Integer)
+// return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
+// else if (obj instanceof Short)
+// return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
+// else if (obj instanceof Boolean)
+// return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
+// else if (obj instanceof Double)
+// return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
+// else
+// throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
+// + obj.getClass().getName());
+// }
+//
+// private ColumnName parseColumn(String column) {
+// ColumnName ret = new ColumnName();
+// int pos = column.indexOf(":");
+// if (pos > 0) {
+// ret.name = column.substring(pos + 1);
+// ret.family = column.substring(0, pos);
+// } else {
+// ret.name = column;
+// }
+// return ret;
+// }
+//
+// private class ColumnName {
+// String family;
+// String name;
+//
+// ColumnName() {
+// }
+// }
+//
+// @Override
+// public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+// conf.setOutputFormat(TableOutputFormat.class);
+// conf.setOutputKeyClass(ImmutableBytesWritable.class);
+// conf.setOutputValueClass(Put.class);
+// }
+//
+// @Override
+// public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
+// JobConf conf) {
+// DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
+// ValueCopier.class);
+// if (null != familyNames) {
+// String columns = Util.join(this.familyNames, " ");
+// LOG.debug("sourcing from column families: {}", columns);
+// conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
+// }
+// }
+//
+// @Override
+// public boolean equals(Object object) {
+// if (this == object) {
+// return true;
+// }
+// if (object == null || getClass() != object.getClass()) {
+// return false;
+// }
+// if (!super.equals(object)) {
+// return false;
+// }
+//
+// HBaseRawScheme that = (HBaseRawScheme) object;
+//
+// if (!Arrays.equals(familyNames, that.familyNames)) {
+// return false;
+// }
+// return true;
+// }
+//
+// @Override
+// public int hashCode() {
+// int result = super.hashCode();
+// result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
+// return result;
+// }
+//
+// public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
+//
+// public ValueCopier() {
+// }
+//
+// public void copyValue(Result oldValue, Result newValue) {
+// if (null != oldValue && null != newValue) {
+// oldValue.copyFrom(newValue);
+// }
+// }
+//
+// }
+//}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
index 0421b6e..780d3fc 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
@@ -1,311 +1,311 @@
-/*
- * Copyright (c) 2009 Concurrent, Inc.
- *
- * This work has been released into the public domain
- * by the copyright holder. This applies worldwide.
- *
- * In case this is not legally possible:
- * The copyright holder grants any entity the right
- * to use this work for any purpose, without any
- * conditions, unless such conditions are required by law.
- */
-
-package parallelai.spyglass.hbase;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import cascading.flow.FlowProcess;
-import cascading.tap.SinkMode;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
-import cascading.tuple.TupleEntryCollector;
-import cascading.tuple.TupleEntryIterator;
-
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-
-/**
- * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with
- * the {@HBaseRawScheme} to allow for the reading and writing
- * of data to and from a HBase cluster.
- */
-@SuppressWarnings({ "deprecation", "rawtypes" })
-public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {
- /**
- *
- */
- private static final long serialVersionUID = 8019189493428493323L;
-
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class);
-
- private final String id = UUID.randomUUID().toString();
-
- /** Field SCHEME */
- public static final String SCHEME = "hbase";
-
- /** Field hBaseAdmin */
- private transient HBaseAdmin hBaseAdmin;
-
- /** Field hostName */
- private String quorumNames;
- /** Field tableName */
- private String tableName;
- private String base64Scan;
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName
- * of type String
- * @param HBaseFullScheme
- * of type HBaseFullScheme
- */
- public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) {
- super(HBaseFullScheme, SinkMode.UPDATE);
- this.tableName = tableName;
- }
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName
- * of type String
- * @param HBaseFullScheme
- * of type HBaseFullScheme
- * @param sinkMode
- * of type SinkMode
- */
- public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
- super(HBaseFullScheme, sinkMode);
- this.tableName = tableName;
- }
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName
- * of type String
- * @param HBaseFullScheme
- * of type HBaseFullScheme
- */
- public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) {
- super(HBaseFullScheme, SinkMode.UPDATE);
- this.quorumNames = quorumNames;
- this.tableName = tableName;
- }
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName
- * of type String
- * @param HBaseFullScheme
- * of type HBaseFullScheme
- * @param sinkMode
- * of type SinkMode
- */
- public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
- super(HBaseFullScheme, sinkMode);
- this.quorumNames = quorumNames;
- this.tableName = tableName;
- }
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param quorumNames HBase quorum
- * @param tableName The name of the HBase table to read
- * @param HBaseFullScheme
- * @param base64Scan An optional base64 encoded scan object
- * @param sinkMode If REPLACE the output table will be deleted before writing to
- */
- public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) {
- super(HBaseFullScheme, sinkMode);
- this.quorumNames = quorumNames;
- this.tableName = tableName;
- this.base64Scan = base64Scan;
- }
-
- /**
- * Method getTableName returns the tableName of this HBaseTap object.
- *
- * @return the tableName (type String) of this HBaseTap object.
- */
- public String getTableName() {
- return tableName;
- }
-
- public Path getPath() {
- return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
- }
-
- protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
- if (hBaseAdmin == null) {
- Configuration hbaseConf = HBaseConfiguration.create(conf);
- hBaseAdmin = new HBaseAdmin(hbaseConf);
- }
-
- return hBaseAdmin;
- }
-
- @Override
- public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
- if (quorumNames != null) {
- conf.set("hbase.zookeeper.quorum", quorumNames);
- }
-
- LOG.debug("sinking to table: {}", tableName);
-
- if (isReplace() && conf.get("mapred.task.partition") == null) {
- try {
- deleteResource(conf);
-
- } catch (IOException e) {
- throw new RuntimeException("could not delete resource: " + e);
- }
- }
-
- else if (isUpdate() || isReplace()) {
- try {
- createResource(conf);
- } catch (IOException e) {
- throw new RuntimeException(tableName + " does not exist !", e);
- }
-
- }
-
- conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
- super.sinkConfInit(process, conf);
- }
-
- @Override
- public String getIdentifier() {
- return id;
- }
-
- @Override
- public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader)
- throws IOException {
- return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
- }
-
- @Override
- public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector)
- throws IOException {
- HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this);
- hBaseCollector.prepare();
- return hBaseCollector;
- }
-
- @Override
- public boolean createResource(JobConf jobConf) throws IOException {
- HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
-
- if (hBaseAdmin.tableExists(tableName)) {
- return true;
- }
-
- LOG.info("creating hbase table: {}", tableName);
-
- HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
-
- String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames();
-
- for (String familyName : familyNames) {
- tableDescriptor.addFamily(new HColumnDescriptor(familyName));
- }
-
- hBaseAdmin.createTable(tableDescriptor);
-
- return true;
- }
-
- @Override
- public boolean deleteResource(JobConf jobConf) throws IOException {
- if (getHBaseAdmin(jobConf).tableExists(tableName)) {
- if (getHBaseAdmin(jobConf).isTableEnabled(tableName))
- getHBaseAdmin(jobConf).disableTable(tableName);
- getHBaseAdmin(jobConf).deleteTable(tableName);
- }
- return true;
- }
-
- @Override
- public boolean resourceExists(JobConf jobConf) throws IOException {
- return getHBaseAdmin(jobConf).tableExists(tableName);
- }
-
- @Override
- public long getModifiedTime(JobConf jobConf) throws IOException {
- return System.currentTimeMillis(); // currently unable to find last mod
- // time
- // on a table
- }
-
- @Override
- public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
- // a hack for MultiInputFormat to see that there is a child format
- FileInputFormat.setInputPaths(conf, getPath());
-
- if (quorumNames != null) {
- conf.set("hbase.zookeeper.quorum", quorumNames);
- }
-
- LOG.debug("sourcing from table: {}", tableName);
- conf.set(TableInputFormat.INPUT_TABLE, tableName);
- if (null != base64Scan)
- conf.set(TableInputFormat.SCAN, base64Scan);
-
- super.sourceConfInit(process, conf);
- }
-
- @Override
- public boolean equals(Object object) {
- if (this == object) {
- return true;
- }
- if (object == null || getClass() != object.getClass()) {
- return false;
- }
- if (!super.equals(object)) {
- return false;
- }
-
- HBaseRawTap hBaseTap = (HBaseRawTap) object;
-
- if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) {
- return false;
- }
-
- if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0);
- return result;
- }
-}
+///*
+//* Copyright (c) 2009 Concurrent, Inc.
+//*
+//* This work has been released into the public domain
+//* by the copyright holder. This applies worldwide.
+//*
+//* In case this is not legally possible:
+//* The copyright holder grants any entity the right
+//* to use this work for any purpose, without any
+//* conditions, unless such conditions are required by law.
+//*/
+//
+//package parallelai.spyglass.hbase;
+//
+//import java.io.IOException;
+//import java.util.UUID;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.hbase.HBaseConfiguration;
+//import org.apache.hadoop.hbase.HColumnDescriptor;
+//import org.apache.hadoop.hbase.HTableDescriptor;
+//import org.apache.hadoop.hbase.MasterNotRunningException;
+//import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+//import org.apache.hadoop.hbase.client.HBaseAdmin;
+//import org.apache.hadoop.hbase.client.Scan;
+//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+//import org.apache.hadoop.mapred.FileInputFormat;
+//import org.apache.hadoop.mapred.JobConf;
+//import org.apache.hadoop.mapred.OutputCollector;
+//import org.apache.hadoop.mapred.RecordReader;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import cascading.flow.FlowProcess;
+//import cascading.tap.SinkMode;
+//import cascading.tap.Tap;
+//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
+//import cascading.tuple.TupleEntryCollector;
+//import cascading.tuple.TupleEntryIterator;
+//
+//import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+//
+///**
+//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with
+//* the {@HBaseRawScheme} to allow for the reading and writing
+//* of data to and from a HBase cluster.
+//*/
+//@SuppressWarnings({ "deprecation", "rawtypes" })
+//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {
+// /**
+// *
+// */
+// private static final long serialVersionUID = 8019189493428493323L;
+//
+// /** Field LOG */
+// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class);
+//
+// private final String id = UUID.randomUUID().toString();
+//
+// /** Field SCHEME */
+// public static final String SCHEME = "hbase";
+//
+// /** Field hBaseAdmin */
+// private transient HBaseAdmin hBaseAdmin;
+//
+// /** Field hostName */
+// private String quorumNames;
+// /** Field tableName */
+// private String tableName;
+// private String base64Scan;
+//
+// /**
+// * Constructor HBaseTap creates a new HBaseTap instance.
+// *
+// * @param tableName
+// * of type String
+// * @param HBaseFullScheme
+// * of type HBaseFullScheme
+// */
+// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) {
+// super(HBaseFullScheme, SinkMode.UPDATE);
+// this.tableName = tableName;
+// }
+//
+// /**
+// * Constructor HBaseTap creates a new HBaseTap instance.
+// *
+// * @param tableName
+// * of type String
+// * @param HBaseFullScheme
+// * of type HBaseFullScheme
+// * @param sinkMode
+// * of type SinkMode
+// */
+// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
+// super(HBaseFullScheme, sinkMode);
+// this.tableName = tableName;
+// }
+//
+// /**
+// * Constructor HBaseTap creates a new HBaseTap instance.
+// *
+// * @param tableName
+// * of type String
+// * @param HBaseFullScheme
+// * of type HBaseFullScheme
+// */
+// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) {
+// super(HBaseFullScheme, SinkMode.UPDATE);
+// this.quorumNames = quorumNames;
+// this.tableName = tableName;
+// }
+//
+// /**
+// * Constructor HBaseTap creates a new HBaseTap instance.
+// *
+// * @param tableName
+// * of type String
+// * @param HBaseFullScheme
+// * of type HBaseFullScheme
+// * @param sinkMode
+// * of type SinkMode
+// */
+// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
+// super(HBaseFullScheme, sinkMode);
+// this.quorumNames = quorumNames;
+// this.tableName = tableName;
+// }
+//
+// /**
+// * Constructor HBaseTap creates a new HBaseTap instance.
+// *
+// * @param quorumNames HBase quorum
+// * @param tableName The name of the HBase table to read
+// * @param HBaseFullScheme
+// * @param base64Scan An optional base64 encoded scan object
+// * @param sinkMode If REPLACE the output table will be deleted before writing to
+// */
+// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) {
+// super(HBaseFullScheme, sinkMode);
+// this.quorumNames = quorumNames;
+// this.tableName = tableName;
+// this.base64Scan = base64Scan;
+// }
+//
+// /**
+// * Method getTableName returns the tableName of this HBaseTap object.
+// *
+// * @return the tableName (type String) of this HBaseTap object.
+// */
+// public String getTableName() {
+// return tableName;
+// }
+//
+// public Path getPath() {
+// return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
+// }
+//
+// protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
+// if (hBaseAdmin == null) {
+// Configuration hbaseConf = HBaseConfiguration.create(conf);
+// hBaseAdmin = new HBaseAdmin(hbaseConf);
+// }
+//
+// return hBaseAdmin;
+// }
+//
+// @Override
+// public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
+// if (quorumNames != null) {
+// conf.set("hbase.zookeeper.quorum", quorumNames);
+// }
+//
+// LOG.debug("sinking to table: {}", tableName);
+//
+// if (isReplace() && conf.get("mapred.task.partition") == null) {
+// try {
+// deleteResource(conf);
+//
+// } catch (IOException e) {
+// throw new RuntimeException("could not delete resource: " + e);
+// }
+// }
+//
+// else if (isUpdate() || isReplace()) {
+// try {
+// createResource(conf);
+// } catch (IOException e) {
+// throw new RuntimeException(tableName + " does not exist !", e);
+// }
+//
+// }
+//
+// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+// super.sinkConfInit(process, conf);
+// }
+//
+// @Override
+// public String getIdentifier() {
+// return id;
+// }
+//
+// @Override
+// public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader)
+// throws IOException {
+// return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
+// }
+//
+// @Override
+// public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector)
+// throws IOException {
+// HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this);
+// hBaseCollector.prepare();
+// return hBaseCollector;
+// }
+//
+// @Override
+// public boolean createResource(JobConf jobConf) throws IOException {
+// HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
+//
+// if (hBaseAdmin.tableExists(tableName)) {
+// return true;
+// }
+//
+// LOG.info("creating hbase table: {}", tableName);
+//
+// HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
+//
+// String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames();
+//
+// for (String familyName : familyNames) {
+// tableDescriptor.addFamily(new HColumnDescriptor(familyName));
+// }
+//
+// hBaseAdmin.createTable(tableDescriptor);
+//
+// return true;
+// }
+//
+// @Override
+// public boolean deleteResource(JobConf jobConf) throws IOException {
+// if (getHBaseAdmin(jobConf).tableExists(tableName)) {
+// if (getHBaseAdmin(jobConf).isTableEnabled(tableName))
+// getHBaseAdmin(jobConf).disableTable(tableName);
+// getHBaseAdmin(jobConf).deleteTable(tableName);
+// }
+// return true;
+// }
+//
+// @Override
+// public boolean resourceExists(JobConf jobConf) throws IOException {
+// return getHBaseAdmin(jobConf).tableExists(tableName);
+// }
+//
+// @Override
+// public long getModifiedTime(JobConf jobConf) throws IOException {
+// return System.currentTimeMillis(); // currently unable to find last mod
+// // time
+// // on a table
+// }
+//
+// @Override
+// public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
+// // a hack for MultiInputFormat to see that there is a child format
+// FileInputFormat.setInputPaths(conf, getPath());
+//
+// if (quorumNames != null) {
+// conf.set("hbase.zookeeper.quorum", quorumNames);
+// }
+//
+// LOG.debug("sourcing from table: {}", tableName);
+// conf.set(TableInputFormat.INPUT_TABLE, tableName);
+// if (null != base64Scan)
+// conf.set(TableInputFormat.SCAN, base64Scan);
+//
+// super.sourceConfInit(process, conf);
+// }
+//
+// @Override
+// public boolean equals(Object object) {
+// if (this == object) {
+// return true;
+// }
+// if (object == null || getClass() != object.getClass()) {
+// return false;
+// }
+// if (!super.equals(object)) {
+// return false;
+// }
+//
+// HBaseRawTap hBaseTap = (HBaseRawTap) object;
+//
+// if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) {
+// return false;
+// }
+//
+// if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) {
+// return false;
+// }
+//
+// return true;
+// }
+//
+// @Override
+// public int hashCode() {
+// int result = super.hashCode();
+// result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0);
+// return result;
+// }
+//}
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
index bbc205b..6216695 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
@@ -1,83 +1,83 @@
-package parallelai.spyglass.hbase
-
-import cascading.pipe.Pipe
-import cascading.pipe.assembly.Coerce
-import cascading.scheme.Scheme
-import cascading.tap.{ Tap, SinkMode }
-import cascading.tuple.Fields
-import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
-import org.apache.hadoop.hbase.util.Bytes
-import scala.collection.JavaConversions._
-import scala.collection.mutable.WrappedArray
-import com.twitter.scalding._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
-import org.apache.hadoop.hbase.util.Base64
-import java.io.ByteArrayOutputStream
-import java.io.DataOutputStream
-
-object HBaseRawSource {
- /**
- * Converts a scan object to a base64 string that can be passed to HBaseRawSource
- * @param scan
- * @return base64 string representation
- */
- def convertScanToString(scan: Scan) = {
- val out = new ByteArrayOutputStream();
- val dos = new DataOutputStream(out);
- scan.write(dos);
- Base64.encodeBytes(out.toByteArray());
- }
-}
-
-
-/**
- * @author Rotem Hermon
- *
- * HBaseRawSource is a scalding source that passes the original row (Result) object to the
- * mapper for customized processing.
- *
- * @param tableName The name of the HBase table to read
- * @param quorumNames HBase quorum
- * @param familyNames Column families to get (source, if null will get all) or update to (sink)
- * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
- * @param base64Scan An optional base64 encoded scan object
- * @param sinkMode If REPLACE the output table will be deleted before writing to
- *
- */
-class HBaseRawSource(
- tableName: String,
- quorumNames: String = "localhost",
- familyNames: Array[String],
- writeNulls: Boolean = true,
- base64Scan: String = null,
- sinkMode: SinkMode = null) extends Source {
-
- override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
- .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
-
- override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
- val hBaseScheme = hdfsScheme match {
- case hbase: HBaseRawScheme => hbase
- case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
- }
- mode match {
- case hdfsMode @ Hdfs(_, _) => readOrWrite match {
- case Read => {
- new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
- case null => SinkMode.KEEP
- case _ => sinkMode
- }).asInstanceOf[Tap[_, _, _]]
- }
- case Write => {
- new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
- case null => SinkMode.UPDATE
- case _ => sinkMode
- }).asInstanceOf[Tap[_, _, _]]
- }
- }
- case _ => super.createTap(readOrWrite)(mode)
- }
- }
-}
+//package parallelai.spyglass.hbase
+//
+//import cascading.pipe.Pipe
+//import cascading.pipe.assembly.Coerce
+//import cascading.scheme.Scheme
+//import cascading.tap.{ Tap, SinkMode }
+//import cascading.tuple.Fields
+//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
+//import org.apache.hadoop.hbase.util.Bytes
+//import scala.collection.JavaConversions._
+//import scala.collection.mutable.WrappedArray
+//import com.twitter.scalding._
+//import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+//import org.apache.hadoop.hbase.client.Scan
+//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+//import org.apache.hadoop.hbase.util.Base64
+//import java.io.ByteArrayOutputStream
+//import java.io.DataOutputStream
+//
+//object HBaseRawSource {
+// /**
+// * Converts a scan object to a base64 string that can be passed to HBaseRawSource
+// * @param scan
+// * @return base64 string representation
+// */
+// def convertScanToString(scan: Scan) = {
+// val out = new ByteArrayOutputStream();
+// val dos = new DataOutputStream(out);
+// scan.write(dos);
+// Base64.encodeBytes(out.toByteArray());
+// }
+//}
+//
+//
+///**
+// * @author Rotem Hermon
+// *
+// * HBaseRawSource is a scalding source that passes the original row (Result) object to the
+// * mapper for customized processing.
+// *
+// * @param tableName The name of the HBase table to read
+// * @param quorumNames HBase quorum
+// * @param familyNames Column families to get (source, if null will get all) or update to (sink)
+// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
+// * @param base64Scan An optional base64 encoded scan object
+// * @param sinkMode If REPLACE the output table will be deleted before writing to
+// *
+// */
+//class HBaseRawSource(
+// tableName: String,
+// quorumNames: String = "localhost",
+// familyNames: Array[String],
+// writeNulls: Boolean = true,
+// base64Scan: String = null,
+// sinkMode: SinkMode = null) extends Source {
+//
+// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
+// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+//
+// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+// val hBaseScheme = hdfsScheme match {
+// case hbase: HBaseRawScheme => hbase
+// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
+// }
+// mode match {
+// case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+// case Read => {
+// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+// case null => SinkMode.KEEP
+// case _ => sinkMode
+// }).asInstanceOf[Tap[_, _, _]]
+// }
+// case Write => {
+// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+// case null => SinkMode.UPDATE
+// case _ => sinkMode
+// }).asInstanceOf[Tap[_, _, _]]
+// }
+// }
+// case _ => super.createTap(readOrWrite)(mode)
+// }
+// }
+//}