aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml49
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java286
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java311
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala62
4 files changed, 689 insertions, 19 deletions
diff --git a/pom.xml b/pom.xml
index d254977..077c22c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>2.0.3</version>
+ <version>2.0.4-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
@@ -29,25 +29,26 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Cloudera's Distribution Including Apache Hadoop version 4.2.0 -->
- <datafu.version>0.0.4-cdh4.2.0</datafu.version>
- <flume.version>1.3.0-cdh4.2.0</flume.version>
- <hadoop.version>2.0.0-cdh4.2.0</hadoop.version>
- <hbase.version>0.94.2-cdh4.2.0</hbase.version>
- <hive.version>0.10.0-cdh4.2.0</hive.version>
- <mahout.version>0.7-cdh4.2.0</mahout.version>
- <mapreduce.version>2.0.0-mr1-cdh4.2.0</mapreduce.version>
- <oozie.version>3.3.0-cdh4.2.0</oozie.version>
- <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0</oozie-hadoop.version>
- <oozie-sharelib.version>3.3.0-cdh4.2.0</oozie-sharelib.version>
- <pig.version>0.10.0-cdh4.2.0</pig.version>
- <sqoop.version>1.4.2-cdh4.2.0</sqoop.version>
- <whirr.version>0.8.0-cdh4.2.0</whirr.version>
- <zookeeper.version>3.4.5-cdh4.2.0</zookeeper.version>
+ <datafu.version>0.0.4-cdh4.3.0</datafu.version>
+ <flume.version>1.3.0-cdh4.3.0</flume.version>
+ <hadoop.version>2.0.0-cdh4.3.0</hadoop.version>
+ <hbase.version>0.94.6-cdh4.3.0</hbase.version>
+ <hive.version>0.10.0-cdh4.3.0</hive.version>
+ <mahout.version>0.7-cdh4.3.0</mahout.version>
+ <mapreduce.version>2.0.0-mr1-cdh4.3.0</mapreduce.version>
+ <oozie.version>3.3.2-cdh4.3.0</oozie.version>
+ <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0</oozie-hadoop.version>
+ <oozie-sharelib.version>3.3.2-cdh4.3.0</oozie-sharelib.version>
+ <pig.version>0.11.0-cdh4.3.0</pig.version>
+ <sqoop.version>1.4.3-cdh4.3.0</sqoop.version>
+ <whirr.version>0.8.2-cdh4.3.0</whirr.version>
+ <zookeeper.version>3.4.5-cdh4.3.0</zookeeper.version>
<!-- Scala/Scalding/Cascading properties -->
- <scala.version>2.9.2</scala.version>
- <scalding.version>0.8.3</scalding.version>
- <cascading.version>2.1.0</cascading.version>
+ <scala.version>2.10.2</scala.version>
+ <scalding.scala.version>2.10</scalding.scala.version>
+ <scalding.version>0.8.5</scalding.version>
+ <cascading.version>2.1.6</cascading.version>
<scalding-commons.version>0.1.1</scalding-commons.version>
<scalatest.version>1.7.1</scalatest.version>
@@ -162,7 +163,7 @@
</dependency>
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>scalding_${scala.version}</artifactId>
+ <artifactId>scalding-core_${scalding.scala.version}</artifactId>
<version>${scalding.version}</version>
<exclusions>
<exclusion> <!-- Declare exclusion, in order to use custom build -->
@@ -189,6 +190,16 @@
<version>${typesafe.config.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.twitter.elephantbird</groupId>
+ <artifactId>elephant-bird-core</artifactId>
+ <version>4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter.elephantbird</groupId>
+ <artifactId>elephant-bird-hadoop-compat</artifactId>
+ <version>4.1</version>
+ </dependency>
</dependencies>
<!-- From https://wiki.scala-lang.org/display/SIW/ScalaEclipseMaven -->
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
new file mode 100644
index 0000000..32730d6
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
+import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import cascading.util.Util;
+
+/**
+ * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
+ * with the {@HBaseRawTap} to allow for the reading and writing of data
+ * to and from a HBase cluster.
+ *
+ * @see HBaseRawTap
+ */
+@SuppressWarnings({ "rawtypes", "deprecation" })
+public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6248976486883281356L;
+
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
+
+ public final Fields RowKeyField = new Fields("rowkey");
+ public final Fields RowField = new Fields("row");
+
+ /** String familyNames */
+ private String[] familyNames;
+
+ private boolean writeNulls = true;
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields
+ * of type Fields
+ * @param familyName
+ * of type String
+ * @param valueFields
+ * of type Fields
+ */
+ public HBaseRawScheme(String familyName) {
+ this(new String[] { familyName });
+ }
+
+ public HBaseRawScheme(String[] familyNames) {
+ this.familyNames = familyNames;
+ setSourceFields();
+ }
+
+ public HBaseRawScheme(String familyName, boolean writeNulls) {
+ this(new String[] { familyName }, writeNulls);
+ }
+
+ public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
+ this.familyNames = familyNames;
+ this.writeNulls = writeNulls;
+ setSourceFields();
+ }
+
+ private void setSourceFields() {
+ Fields sourceFields = Fields.join(RowKeyField, RowField);
+ setSourceFields(sourceFields);
+ }
+
+ /**
+ * Method getFamilyNames returns the set of familyNames of this HBaseScheme
+ * object.
+ *
+ * @return the familyNames (type String[]) of this HBaseScheme object.
+ */
+ public String[] getFamilyNames() {
+ HashSet<String> familyNameSet = new HashSet<String>();
+ if (familyNames != null) {
+ for (String familyName : familyNames) {
+ familyNameSet.add(familyName);
+ }
+ }
+ return familyNameSet.toArray(new String[0]);
+ }
+
+ @Override
+ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+ Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
+
+ sourceCall.setContext(pair);
+ }
+
+ @Override
+ public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+ sourceCall.setContext(null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
+ throws IOException {
+ Tuple result = new Tuple();
+
+ Object key = sourceCall.getContext()[0];
+ Object value = sourceCall.getContext()[1];
+ boolean hasNext = sourceCall.getInput().next(key, value);
+ if (!hasNext) {
+ return false;
+ }
+
+ // Skip nulls
+ if (key == null || value == null) {
+ return true;
+ }
+
+ ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+ Result row = (Result) value;
+ result.add(keyWritable);
+ result.add(row);
+ sourceCall.getIncomingEntry().setTuple(result);
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
+ TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+ OutputCollector outputCollector = sinkCall.getOutput();
+ Tuple key = tupleEntry.selectTuple(RowKeyField);
+ Object okey = key.getObject(0);
+ ImmutableBytesWritable keyBytes = getBytes(okey);
+ Put put = new Put(keyBytes.get());
+ Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
+ if (null != outFields) {
+ TupleEntry values = tupleEntry.selectEntry(outFields);
+ for (int n = 0; n < values.getFields().size(); n++) {
+ Object o = values.get(n);
+ ImmutableBytesWritable valueBytes = getBytes(o);
+ Comparable field = outFields.get(n);
+ ColumnName cn = parseColumn((String) field);
+ if (null == cn.family) {
+ if (n >= familyNames.length)
+ cn.family = familyNames[familyNames.length - 1];
+ else
+ cn.family = familyNames[n];
+ }
+ if (null != o || writeNulls)
+ put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
+ }
+ }
+ outputCollector.collect(null, put);
+ }
+
+ private ImmutableBytesWritable getBytes(Object obj) {
+ if (null == obj)
+ return new ImmutableBytesWritable(new byte[0]);
+ if (obj instanceof ImmutableBytesWritable)
+ return (ImmutableBytesWritable) obj;
+ else if (obj instanceof String)
+ return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
+ else if (obj instanceof Long)
+ return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
+ else if (obj instanceof Integer)
+ return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
+ else if (obj instanceof Short)
+ return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
+ else if (obj instanceof Boolean)
+ return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
+ else if (obj instanceof Double)
+ return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
+ else
+ throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
+ + obj.getClass().getName());
+ }
+
+ private ColumnName parseColumn(String column) {
+ ColumnName ret = new ColumnName();
+ int pos = column.indexOf(":");
+ if (pos > 0) {
+ ret.name = column.substring(pos + 1);
+ ret.family = column.substring(0, pos);
+ } else {
+ ret.name = column;
+ }
+ return ret;
+ }
+
+ private class ColumnName {
+ String family;
+ String name;
+
+ ColumnName() {
+ }
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+ conf.setOutputFormat(TableOutputFormat.class);
+ conf.setOutputKeyClass(ImmutableBytesWritable.class);
+ conf.setOutputValueClass(Put.class);
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
+ JobConf conf) {
+ DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
+ ValueCopier.class);
+ if (null != familyNames) {
+ String columns = Util.join(this.familyNames, " ");
+ LOG.debug("sourcing from column families: {}", columns);
+ conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
+ }
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ HBaseRawScheme that = (HBaseRawScheme) object;
+
+ if (!Arrays.equals(familyNames, that.familyNames)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
+ return result;
+ }
+
+ public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
+
+ public ValueCopier() {
+ }
+
+ public void copyValue(Result oldValue, Result newValue) {
+ if (null != oldValue && null != newValue) {
+ oldValue.copyFrom(newValue);
+ }
+ }
+
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
new file mode 100644
index 0000000..738fd51
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
@@ -0,0 +1,311 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cascading.flow.FlowProcess;
+import cascading.tap.SinkMode;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
+import cascading.tuple.TupleEntryCollector;
+import cascading.tuple.TupleEntryIterator;
+
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+
+/**
+ * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with
+ * the {@HBaseRawScheme} to allow for the reading and writing
+ * of data to and from a HBase cluster.
+ */
+@SuppressWarnings({ "deprecation", "rawtypes" })
+public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 8019189493428493323L;
+
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ /** Field SCHEME */
+ public static final String SCHEME = "hbase";
+
+ /** Field hBaseAdmin */
+ private transient HBaseAdmin hBaseAdmin;
+
+ /** Field hostName */
+ private String quorumNames;
+ /** Field tableName */
+ private String tableName;
+ private String base64Scan;
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) {
+ super(HBaseFullScheme, SinkMode.UPDATE);
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) {
+ super(HBaseFullScheme, SinkMode.UPDATE);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param quorumNames
+ * @param tableName
+ * @param HBaseFullScheme
+ * @param base64Scan
+ * @param sinkMode
+ */
+ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ this.base64Scan = base64Scan;
+ }
+
+ /**
+ * Method getTableName returns the tableName of this HBaseTap object.
+ *
+ * @return the tableName (type String) of this HBaseTap object.
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Path getPath() {
+ return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
+ }
+
+ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
+ if (hBaseAdmin == null) {
+ Configuration hbaseConf = HBaseConfiguration.create(conf);
+ hBaseAdmin = new HBaseAdmin(hbaseConf);
+ }
+
+ return hBaseAdmin;
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
+ if (quorumNames != null) {
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+ }
+
+ LOG.debug("sinking to table: {}", tableName);
+
+ if (isReplace() && conf.get("mapred.task.partition") == null) {
+ try {
+ deleteResource(conf);
+
+ } catch (IOException e) {
+ throw new RuntimeException("could not delete resource: " + e);
+ }
+ }
+
+ else if (isUpdate() || isReplace()) {
+ try {
+ createResource(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(tableName + " does not exist !", e);
+ }
+
+ }
+
+ conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ super.sinkConfInit(process, conf);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader)
+ throws IOException {
+ return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
+ }
+
+ @Override
+ public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector)
+ throws IOException {
+ HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this);
+ hBaseCollector.prepare();
+ return hBaseCollector;
+ }
+
+ @Override
+ public boolean createResource(JobConf jobConf) throws IOException {
+ HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
+
+ if (hBaseAdmin.tableExists(tableName)) {
+ return true;
+ }
+
+ LOG.info("creating hbase table: {}", tableName);
+
+ HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
+
+ String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames();
+
+ for (String familyName : familyNames) {
+ tableDescriptor.addFamily(new HColumnDescriptor(familyName));
+ }
+
+ hBaseAdmin.createTable(tableDescriptor);
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(JobConf jobConf) throws IOException {
+ if (getHBaseAdmin(jobConf).tableExists(tableName)) {
+ if (getHBaseAdmin(jobConf).isTableEnabled(tableName))
+ getHBaseAdmin(jobConf).disableTable(tableName);
+ getHBaseAdmin(jobConf).deleteTable(tableName);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean resourceExists(JobConf jobConf) throws IOException {
+ return getHBaseAdmin(jobConf).tableExists(tableName);
+ }
+
+ @Override
+ public long getModifiedTime(JobConf jobConf) throws IOException {
+ return System.currentTimeMillis(); // currently unable to find last mod
+ // time
+ // on a table
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
+ // a hack for MultiInputFormat to see that there is a child format
+ FileInputFormat.setInputPaths(conf, getPath());
+
+ if (quorumNames != null) {
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+ }
+
+ LOG.debug("sourcing from table: {}", tableName);
+ conf.set(TableInputFormat.INPUT_TABLE, tableName);
+ if (null != base64Scan)
+ conf.set(TableInputFormat.SCAN, base64Scan);
+
+ super.sourceConfInit(process, conf);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ HBaseRawTap hBaseTap = (HBaseRawTap) object;
+
+ if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) {
+ return false;
+ }
+
+ if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
new file mode 100644
index 0000000..1fc6f7d
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
@@ -0,0 +1,62 @@
+package parallelai.spyglass.hbase
+
+import cascading.pipe.Pipe
+import cascading.pipe.assembly.Coerce
+import cascading.scheme.Scheme
+import cascading.tap.{ Tap, SinkMode }
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
+import org.apache.hadoop.hbase.util.Bytes
+import scala.collection.JavaConversions._
+import scala.collection.mutable.WrappedArray
+import com.twitter.scalding._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+import org.apache.hadoop.hbase.util.Base64
+import java.io.ByteArrayOutputStream
+import java.io.DataOutputStream
+
+object HBaseRawSource {
+ def convertScanToString(scan: Scan) = {
+ val out = new ByteArrayOutputStream();
+ val dos = new DataOutputStream(out);
+ scan.write(dos);
+ Base64.encodeBytes(out.toByteArray());
+ }
+}
+class HBaseRawSource(
+ tableName: String,
+ quorumNames: String = "localhost",
+ familyNames: Array[String],
+ writeNulls: Boolean = true,
+ base64Scan:String = null,
+ sinkMode: SinkMode = null) extends Source {
+
+ override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val hBaseScheme = hdfsScheme match {
+ case hbase: HBaseRawScheme => hbase
+ case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+ case null => SinkMode.KEEP
+ case _ => sinkMode
+ }).asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+ case null => SinkMode.UPDATE
+ case _ => sinkMode
+ }).asInstanceOf[Tap[_,_,_]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}