aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml57
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java572
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java622
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java734
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala166
5 files changed, 1097 insertions, 1054 deletions
diff --git a/pom.xml b/pom.xml
index 8c1adb8..4b22b61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,13 +8,6 @@
<url>http://www.parallelai.com</url>
</organization>
-
- <name>Cascading and Scalding wrapper for HBase with advanced features</name>
- <groupId>parallelai</groupId>
- <artifactId>parallelai.spyglass</artifactId>
- <version>2.9.3_2.2.0</version>
- <packaging>jar</packaging>
-
<properties>
<!-- Java compilation level -->
<maven.compiler.source>1.6</maven.compiler.source>
@@ -28,32 +21,32 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <datafu.version>0.0.4-cdh4.2.0</datafu.version>
- <flume.version>1.3.0-cdh4.2.0</flume.version>
- <hadoop.version>2.0.0-cdh4.2.0</hadoop.version>
- <hbase.version>0.94.2-cdh4.2.0</hbase.version>
- <hive.version>0.10.0-cdh4.2.0</hive.version>
- <mahout.version>0.7-cdh4.2.0</mahout.version>
- <mapreduce.version>2.0.0-mr1-cdh4.2.0</mapreduce.version>
- <oozie.version>3.3.0-cdh4.2.0</oozie.version>
- <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0</oozie-hadoop.version>
- <oozie-sharelib.version>3.3.0-cdh4.2.0</oozie-sharelib.version>
- <pig.version>0.10.0-cdh4.2.0</pig.version>
- <sqoop.version>1.4.2-cdh4.2.0</sqoop.version>
- <whirr.version>0.8.0-cdh4.2.0</whirr.version>
- <zookeeper.version>3.4.5-cdh4.2.0</zookeeper.version>
+ <datafu.version>0.0.4-cdh4.3.0</datafu.version>
+ <flume.version>1.3.0-cdh4.3.0</flume.version>
+ <hadoop.version>2.0.0-cdh4.3.0</hadoop.version>
+ <hbase.version>0.94.6-cdh4.3.0</hbase.version>
+ <hive.version>0.10.0-cdh4.3.0</hive.version>
+ <mahout.version>0.7-cdh4.3.0</mahout.version>
+ <mapreduce.version>2.0.0-mr1-cdh4.3.0</mapreduce.version>
+ <oozie.version>3.3.2-cdh4.3.0</oozie.version>
+ <oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.2-cdh4.3.0</oozie-hadoop.version>
+ <oozie-sharelib.version>3.3.2-cdh4.3.0</oozie-sharelib.version>
+ <pig.version>0.11.0-cdh4.3.0</pig.version>
+ <sqoop.version>1.4.3-cdh4.3.0</sqoop.version>
+ <whirr.version>0.8.2-cdh4.3.0</whirr.version>
+ <zookeeper.version>3.4.5-cdh4.3.0</zookeeper.version>
<!-- Scala/Scalding/Cascading properties -->
- <scala.version>2.9.3</scala.version>
- <scalding.scala.version>2.9.3</scalding.scala.version>
+ <scala.version>2.10.2</scala.version>
+ <scalding.scala.version>2.10</scalding.scala.version>
<scalding.version>0.8.6</scalding.version>
- <cascading.version>2.1.0</cascading.version>
+ <cascading.version>2.1.6</cascading.version>
<scalding-commons.version>0.2.0</scalding-commons.version>
<scalatest.version>1.9.1</scalatest.version>
<trove4j.version>3.0.3</trove4j.version>
<maple.version>0.2.8</maple.version>
- <specs2.version>1.12.4.1</specs2.version>
+ <specs2.version>2.1</specs2.version>
<typesafe.config.version>1.0.0</typesafe.config.version>
<!-- Other libraries properties -->
@@ -66,7 +59,13 @@
</properties>
- <distributionManagement>
+ <name>Cascading and Scalding wrapper for HBase with advanced features</name>
+ <groupId>parallelai</groupId>
+ <artifactId>parallelai.spyglass</artifactId>
+ <version>${scala.version}_2.3.0</version>
+ <packaging>jar</packaging>
+
+ <distributionManagement>
<repository>
<id>repo</id>
<url>https://github.com/ParallelAI/mvn-repo/raw/master/releases</url>
@@ -90,6 +89,10 @@
<name>Con Jars</name>
<url>http://conjars.org/repo</url>
</repository>
+ <repository>
+ <id>mvnrepository</id>
+ <url>http://repo1.maven.org/maven2</url>
+ </repository>
</repositories>
<!-- Profiles -->
@@ -237,7 +240,7 @@
</includes>
</configuration>
</plugin>
- <!-- This plugin is not supported by Eclipse, so maybe we shouldn't be
+ <!-- This plugin is not supported by Eclipse, so maybe we shouldn't be
using it -->
<plugin>
<groupId>org.scala-tools</groupId>
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
index 7dba40d..7b62c88 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java
@@ -1,286 +1,286 @@
-///*
-//* Copyright (c) 2009 Concurrent, Inc.
-//*
-//* This work has been released into the public domain
-//* by the copyright holder. This applies worldwide.
-//*
-//* In case this is not legally possible:
-//* The copyright holder grants any entity the right
-//* to use this work for any purpose, without any
-//* conditions, unless such conditions are required by law.
-//*/
-//
-//package parallelai.spyglass.hbase;
-//
-//import java.io.IOException;
-//import java.util.Arrays;
-//import java.util.HashSet;
-//
-//import org.apache.hadoop.hbase.client.Put;
-//import org.apache.hadoop.hbase.client.Result;
-//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-//import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-//import org.apache.hadoop.hbase.util.Bytes;
-//import org.apache.hadoop.mapred.JobConf;
-//import org.apache.hadoop.mapred.OutputCollector;
-//import org.apache.hadoop.mapred.RecordReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
-//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
-//
-//import cascading.flow.FlowProcess;
-//import cascading.scheme.Scheme;
-//import cascading.scheme.SinkCall;
-//import cascading.scheme.SourceCall;
-//import cascading.tap.Tap;
-//import cascading.tuple.Fields;
-//import cascading.tuple.Tuple;
-//import cascading.tuple.TupleEntry;
-//import cascading.util.Util;
-//
-///**
-//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
-//* with the {@HBaseRawTap} to allow for the reading and writing of data
-//* to and from a HBase cluster.
-//*
-//* @see HBaseRawTap
-//*/
-//@SuppressWarnings({ "rawtypes", "deprecation" })
-//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
-// /**
-// *
-// */
-// private static final long serialVersionUID = 6248976486883281356L;
-//
-// /** Field LOG */
-// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
-//
-// public final Fields RowKeyField = new Fields("rowkey");
-// public final Fields RowField = new Fields("row");
-//
-// /** String familyNames */
-// private String[] familyNames;
-//
-// private boolean writeNulls = true;
-//
-// /**
-// * Constructor HBaseScheme creates a new HBaseScheme instance.
-// *
-// * @param keyFields
-// * of type Fields
-// * @param familyName
-// * of type String
-// * @param valueFields
-// * of type Fields
-// */
-// public HBaseRawScheme(String familyName) {
-// this(new String[] { familyName });
-// }
-//
-// public HBaseRawScheme(String[] familyNames) {
-// this.familyNames = familyNames;
-// setSourceFields();
-// }
-//
-// public HBaseRawScheme(String familyName, boolean writeNulls) {
-// this(new String[] { familyName }, writeNulls);
-// }
-//
-// public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
-// this.familyNames = familyNames;
-// this.writeNulls = writeNulls;
-// setSourceFields();
-// }
-//
-// private void setSourceFields() {
-// Fields sourceFields = Fields.join(RowKeyField, RowField);
-// setSourceFields(sourceFields);
-// }
-//
-// /**
-// * Method getFamilyNames returns the set of familyNames of this HBaseScheme
-// * object.
-// *
-// * @return the familyNames (type String[]) of this HBaseScheme object.
-// */
-// public String[] getFamilyNames() {
-// HashSet<String> familyNameSet = new HashSet<String>();
-// if (familyNames != null) {
-// for (String familyName : familyNames) {
-// familyNameSet.add(familyName);
-// }
-// }
-// return familyNameSet.toArray(new String[0]);
-// }
-//
-// @Override
-// public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
-// Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
-//
-// sourceCall.setContext(pair);
-// }
-//
-// @Override
-// public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
-// sourceCall.setContext(null);
-// }
-//
-// @SuppressWarnings("unchecked")
-// @Override
-// public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
-// throws IOException {
-// Tuple result = new Tuple();
-//
-// Object key = sourceCall.getContext()[0];
-// Object value = sourceCall.getContext()[1];
-// boolean hasNext = sourceCall.getInput().next(key, value);
-// if (!hasNext) {
-// return false;
-// }
-//
-// // Skip nulls
-// if (key == null || value == null) {
-// return true;
-// }
-//
-// ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
-// Result row = (Result) value;
-// result.add(keyWritable);
-// result.add(row);
-// sourceCall.getIncomingEntry().setTuple(result);
-// return true;
-// }
-//
-// @SuppressWarnings("unchecked")
-// @Override
-// public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
-// TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
-// OutputCollector outputCollector = sinkCall.getOutput();
-// Tuple key = tupleEntry.selectTuple(RowKeyField);
-// Object okey = key.getObject(0);
-// ImmutableBytesWritable keyBytes = getBytes(okey);
-// Put put = new Put(keyBytes.get());
-// Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
-// if (null != outFields) {
-// TupleEntry values = tupleEntry.selectEntry(outFields);
-// for (int n = 0; n < values.getFields().size(); n++) {
-// Object o = values.get(n);
-// ImmutableBytesWritable valueBytes = getBytes(o);
-// Comparable field = outFields.get(n);
-// ColumnName cn = parseColumn((String) field);
-// if (null == cn.family) {
-// if (n >= familyNames.length)
-// cn.family = familyNames[familyNames.length - 1];
-// else
-// cn.family = familyNames[n];
-// }
-// if (null != o || writeNulls)
-// put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
-// }
-// }
-// outputCollector.collect(null, put);
-// }
-//
-// private ImmutableBytesWritable getBytes(Object obj) {
-// if (null == obj)
-// return new ImmutableBytesWritable(new byte[0]);
-// if (obj instanceof ImmutableBytesWritable)
-// return (ImmutableBytesWritable) obj;
-// else if (obj instanceof String)
-// return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
-// else if (obj instanceof Long)
-// return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
-// else if (obj instanceof Integer)
-// return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
-// else if (obj instanceof Short)
-// return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
-// else if (obj instanceof Boolean)
-// return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
-// else if (obj instanceof Double)
-// return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
-// else
-// throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
-// + obj.getClass().getName());
-// }
-//
-// private ColumnName parseColumn(String column) {
-// ColumnName ret = new ColumnName();
-// int pos = column.indexOf(":");
-// if (pos > 0) {
-// ret.name = column.substring(pos + 1);
-// ret.family = column.substring(0, pos);
-// } else {
-// ret.name = column;
-// }
-// return ret;
-// }
-//
-// private class ColumnName {
-// String family;
-// String name;
-//
-// ColumnName() {
-// }
-// }
-//
-// @Override
-// public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
-// conf.setOutputFormat(TableOutputFormat.class);
-// conf.setOutputKeyClass(ImmutableBytesWritable.class);
-// conf.setOutputValueClass(Put.class);
-// }
-//
-// @Override
-// public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
-// JobConf conf) {
-// DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
-// ValueCopier.class);
-// if (null != familyNames) {
-// String columns = Util.join(this.familyNames, " ");
-// LOG.debug("sourcing from column families: {}", columns);
-// conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
-// }
-// }
-//
-// @Override
-// public boolean equals(Object object) {
-// if (this == object) {
-// return true;
-// }
-// if (object == null || getClass() != object.getClass()) {
-// return false;
-// }
-// if (!super.equals(object)) {
-// return false;
-// }
-//
-// HBaseRawScheme that = (HBaseRawScheme) object;
-//
-// if (!Arrays.equals(familyNames, that.familyNames)) {
-// return false;
-// }
-// return true;
-// }
-//
-// @Override
-// public int hashCode() {
-// int result = super.hashCode();
-// result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
-// return result;
-// }
-//
-// public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
-//
-// public ValueCopier() {
-// }
-//
-// public void copyValue(Result oldValue, Result newValue) {
-// if (null != oldValue && null != newValue) {
-// oldValue.copyFrom(newValue);
-// }
-// }
-//
-// }
-//}
+/*
+* Copyright (c) 2009 Concurrent, Inc.
+*
+* This work has been released into the public domain
+* by the copyright holder. This applies worldwide.
+*
+* In case this is not legally possible:
+* The copyright holder grants any entity the right
+* to use this work for any purpose, without any
+* conditions, unless such conditions are required by law.
+*/
+
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
+import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import cascading.util.Util;
+
+/**
+* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
+* with the {@HBaseRawTap} to allow for the reading and writing of data
+* to and from a HBase cluster.
+*
+* @see HBaseRawTap
+*/
+@SuppressWarnings({ "rawtypes", "deprecation" })
+public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6248976486883281356L;
+
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);
+
+ public final Fields RowKeyField = new Fields("rowkey");
+ public final Fields RowField = new Fields("row");
+
+ /** String familyNames */
+ private String[] familyNames;
+
+ private boolean writeNulls = true;
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields
+ * of type Fields
+ * @param familyName
+ * of type String
+ * @param valueFields
+ * of type Fields
+ */
+ public HBaseRawScheme(String familyName) {
+ this(new String[] { familyName });
+ }
+
+ public HBaseRawScheme(String[] familyNames) {
+ this.familyNames = familyNames;
+ setSourceFields();
+ }
+
+ public HBaseRawScheme(String familyName, boolean writeNulls) {
+ this(new String[] { familyName }, writeNulls);
+ }
+
+ public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
+ this.familyNames = familyNames;
+ this.writeNulls = writeNulls;
+ setSourceFields();
+ }
+
+ private void setSourceFields() {
+ Fields sourceFields = Fields.join(RowKeyField, RowField);
+ setSourceFields(sourceFields);
+ }
+
+ /**
+ * Method getFamilyNames returns the set of familyNames of this HBaseScheme
+ * object.
+ *
+ * @return the familyNames (type String[]) of this HBaseScheme object.
+ */
+ public String[] getFamilyNames() {
+ HashSet<String> familyNameSet = new HashSet<String>();
+ if (familyNames != null) {
+ for (String familyName : familyNames) {
+ familyNameSet.add(familyName);
+ }
+ }
+ return familyNameSet.toArray(new String[0]);
+ }
+
+ @Override
+ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+ Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };
+
+ sourceCall.setContext(pair);
+ }
+
+ @Override
+ public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
+ sourceCall.setContext(null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
+ throws IOException {
+ Tuple result = new Tuple();
+
+ Object key = sourceCall.getContext()[0];
+ Object value = sourceCall.getContext()[1];
+ boolean hasNext = sourceCall.getInput().next(key, value);
+ if (!hasNext) {
+ return false;
+ }
+
+ // Skip nulls
+ if (key == null || value == null) {
+ return true;
+ }
+
+ ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+ Result row = (Result) value;
+ result.add(keyWritable);
+ result.add(row);
+ sourceCall.getIncomingEntry().setTuple(result);
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
+ TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+ OutputCollector outputCollector = sinkCall.getOutput();
+ Tuple key = tupleEntry.selectTuple(RowKeyField);
+ Object okey = key.getObject(0);
+ ImmutableBytesWritable keyBytes = getBytes(okey);
+ Put put = new Put(keyBytes.get());
+ Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
+ if (null != outFields) {
+ TupleEntry values = tupleEntry.selectEntry(outFields);
+ for (int n = 0; n < values.getFields().size(); n++) {
+ Object o = values.get(n);
+ ImmutableBytesWritable valueBytes = getBytes(o);
+ Comparable field = outFields.get(n);
+ ColumnName cn = parseColumn((String) field);
+ if (null == cn.family) {
+ if (n >= familyNames.length)
+ cn.family = familyNames[familyNames.length - 1];
+ else
+ cn.family = familyNames[n];
+ }
+ if (null != o || writeNulls)
+ put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
+ }
+ }
+ outputCollector.collect(null, put);
+ }
+
+ private ImmutableBytesWritable getBytes(Object obj) {
+ if (null == obj)
+ return new ImmutableBytesWritable(new byte[0]);
+ if (obj instanceof ImmutableBytesWritable)
+ return (ImmutableBytesWritable) obj;
+ else if (obj instanceof String)
+ return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
+ else if (obj instanceof Long)
+ return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
+ else if (obj instanceof Integer)
+ return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
+ else if (obj instanceof Short)
+ return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
+ else if (obj instanceof Boolean)
+ return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
+ else if (obj instanceof Double)
+ return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
+ else
+ throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
+ + obj.getClass().getName());
+ }
+
+ private ColumnName parseColumn(String column) {
+ ColumnName ret = new ColumnName();
+ int pos = column.indexOf(":");
+ if (pos > 0) {
+ ret.name = column.substring(pos + 1);
+ ret.family = column.substring(0, pos);
+ } else {
+ ret.name = column;
+ }
+ return ret;
+ }
+
+ private class ColumnName {
+ String family;
+ String name;
+
+ ColumnName() {
+ }
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
+ conf.setOutputFormat(TableOutputFormat.class);
+ conf.setOutputKeyClass(ImmutableBytesWritable.class);
+ conf.setOutputValueClass(Put.class);
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
+ JobConf conf) {
+ DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
+ ValueCopier.class);
+ if (null != familyNames) {
+ String columns = Util.join(this.familyNames, " ");
+ LOG.debug("sourcing from column families: {}", columns);
+ conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
+ }
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ HBaseRawScheme that = (HBaseRawScheme) object;
+
+ if (!Arrays.equals(familyNames, that.familyNames)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
+ return result;
+ }
+
+ public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {
+
+ public ValueCopier() {
+ }
+
+ public void copyValue(Result oldValue, Result newValue) {
+ if (null != oldValue && null != newValue) {
+ oldValue.copyFrom(newValue);
+ }
+ }
+
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
index 780d3fc..5dcd57d 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
@@ -1,311 +1,311 @@
-///*
-//* Copyright (c) 2009 Concurrent, Inc.
-//*
-//* This work has been released into the public domain
-//* by the copyright holder. This applies worldwide.
-//*
-//* In case this is not legally possible:
-//* The copyright holder grants any entity the right
-//* to use this work for any purpose, without any
-//* conditions, unless such conditions are required by law.
-//*/
-//
-//package parallelai.spyglass.hbase;
-//
-//import java.io.IOException;
-//import java.util.UUID;
-//
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.hbase.HBaseConfiguration;
-//import org.apache.hadoop.hbase.HColumnDescriptor;
-//import org.apache.hadoop.hbase.HTableDescriptor;
-//import org.apache.hadoop.hbase.MasterNotRunningException;
-//import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-//import org.apache.hadoop.hbase.client.HBaseAdmin;
-//import org.apache.hadoop.hbase.client.Scan;
-//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-//import org.apache.hadoop.mapred.FileInputFormat;
-//import org.apache.hadoop.mapred.JobConf;
-//import org.apache.hadoop.mapred.OutputCollector;
-//import org.apache.hadoop.mapred.RecordReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import cascading.flow.FlowProcess;
-//import cascading.tap.SinkMode;
-//import cascading.tap.Tap;
-//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
-//import cascading.tuple.TupleEntryCollector;
-//import cascading.tuple.TupleEntryIterator;
-//
-//import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-//
-///**
-//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with
-//* the {@HBaseRawScheme} to allow for the reading and writing
-//* of data to and from a HBase cluster.
-//*/
-//@SuppressWarnings({ "deprecation", "rawtypes" })
-//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {
-// /**
-// *
-// */
-// private static final long serialVersionUID = 8019189493428493323L;
-//
-// /** Field LOG */
-// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class);
-//
-// private final String id = UUID.randomUUID().toString();
-//
-// /** Field SCHEME */
-// public static final String SCHEME = "hbase";
-//
-// /** Field hBaseAdmin */
-// private transient HBaseAdmin hBaseAdmin;
-//
-// /** Field hostName */
-// private String quorumNames;
-// /** Field tableName */
-// private String tableName;
-// private String base64Scan;
-//
-// /**
-// * Constructor HBaseTap creates a new HBaseTap instance.
-// *
-// * @param tableName
-// * of type String
-// * @param HBaseFullScheme
-// * of type HBaseFullScheme
-// */
-// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) {
-// super(HBaseFullScheme, SinkMode.UPDATE);
-// this.tableName = tableName;
-// }
-//
-// /**
-// * Constructor HBaseTap creates a new HBaseTap instance.
-// *
-// * @param tableName
-// * of type String
-// * @param HBaseFullScheme
-// * of type HBaseFullScheme
-// * @param sinkMode
-// * of type SinkMode
-// */
-// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
-// super(HBaseFullScheme, sinkMode);
-// this.tableName = tableName;
-// }
-//
-// /**
-// * Constructor HBaseTap creates a new HBaseTap instance.
-// *
-// * @param tableName
-// * of type String
-// * @param HBaseFullScheme
-// * of type HBaseFullScheme
-// */
-// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) {
-// super(HBaseFullScheme, SinkMode.UPDATE);
-// this.quorumNames = quorumNames;
-// this.tableName = tableName;
-// }
-//
-// /**
-// * Constructor HBaseTap creates a new HBaseTap instance.
-// *
-// * @param tableName
-// * of type String
-// * @param HBaseFullScheme
-// * of type HBaseFullScheme
-// * @param sinkMode
-// * of type SinkMode
-// */
-// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
-// super(HBaseFullScheme, sinkMode);
-// this.quorumNames = quorumNames;
-// this.tableName = tableName;
-// }
-//
-// /**
-// * Constructor HBaseTap creates a new HBaseTap instance.
-// *
-// * @param quorumNames HBase quorum
-// * @param tableName The name of the HBase table to read
-// * @param HBaseFullScheme
-// * @param base64Scan An optional base64 encoded scan object
-// * @param sinkMode If REPLACE the output table will be deleted before writing to
-// */
-// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) {
-// super(HBaseFullScheme, sinkMode);
-// this.quorumNames = quorumNames;
-// this.tableName = tableName;
-// this.base64Scan = base64Scan;
-// }
-//
-// /**
-// * Method getTableName returns the tableName of this HBaseTap object.
-// *
-// * @return the tableName (type String) of this HBaseTap object.
-// */
-// public String getTableName() {
-// return tableName;
-// }
-//
-// public Path getPath() {
-// return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
-// }
-//
-// protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
-// if (hBaseAdmin == null) {
-// Configuration hbaseConf = HBaseConfiguration.create(conf);
-// hBaseAdmin = new HBaseAdmin(hbaseConf);
-// }
-//
-// return hBaseAdmin;
-// }
-//
-// @Override
-// public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
-// if (quorumNames != null) {
-// conf.set("hbase.zookeeper.quorum", quorumNames);
-// }
-//
-// LOG.debug("sinking to table: {}", tableName);
-//
-// if (isReplace() && conf.get("mapred.task.partition") == null) {
-// try {
-// deleteResource(conf);
-//
-// } catch (IOException e) {
-// throw new RuntimeException("could not delete resource: " + e);
-// }
-// }
-//
-// else if (isUpdate() || isReplace()) {
-// try {
-// createResource(conf);
-// } catch (IOException e) {
-// throw new RuntimeException(tableName + " does not exist !", e);
-// }
-//
-// }
-//
-// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
-// super.sinkConfInit(process, conf);
-// }
-//
-// @Override
-// public String getIdentifier() {
-// return id;
-// }
-//
-// @Override
-// public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader)
-// throws IOException {
-// return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
-// }
-//
-// @Override
-// public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector)
-// throws IOException {
-// HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this);
-// hBaseCollector.prepare();
-// return hBaseCollector;
-// }
-//
-// @Override
-// public boolean createResource(JobConf jobConf) throws IOException {
-// HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
-//
-// if (hBaseAdmin.tableExists(tableName)) {
-// return true;
-// }
-//
-// LOG.info("creating hbase table: {}", tableName);
-//
-// HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
-//
-// String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames();
-//
-// for (String familyName : familyNames) {
-// tableDescriptor.addFamily(new HColumnDescriptor(familyName));
-// }
-//
-// hBaseAdmin.createTable(tableDescriptor);
-//
-// return true;
-// }
-//
-// @Override
-// public boolean deleteResource(JobConf jobConf) throws IOException {
-// if (getHBaseAdmin(jobConf).tableExists(tableName)) {
-// if (getHBaseAdmin(jobConf).isTableEnabled(tableName))
-// getHBaseAdmin(jobConf).disableTable(tableName);
-// getHBaseAdmin(jobConf).deleteTable(tableName);
-// }
-// return true;
-// }
-//
-// @Override
-// public boolean resourceExists(JobConf jobConf) throws IOException {
-// return getHBaseAdmin(jobConf).tableExists(tableName);
-// }
-//
-// @Override
-// public long getModifiedTime(JobConf jobConf) throws IOException {
-// return System.currentTimeMillis(); // currently unable to find last mod
-// // time
-// // on a table
-// }
-//
-// @Override
-// public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
-// // a hack for MultiInputFormat to see that there is a child format
-// FileInputFormat.setInputPaths(conf, getPath());
-//
-// if (quorumNames != null) {
-// conf.set("hbase.zookeeper.quorum", quorumNames);
-// }
-//
-// LOG.debug("sourcing from table: {}", tableName);
-// conf.set(TableInputFormat.INPUT_TABLE, tableName);
-// if (null != base64Scan)
-// conf.set(TableInputFormat.SCAN, base64Scan);
-//
-// super.sourceConfInit(process, conf);
-// }
-//
-// @Override
-// public boolean equals(Object object) {
-// if (this == object) {
-// return true;
-// }
-// if (object == null || getClass() != object.getClass()) {
-// return false;
-// }
-// if (!super.equals(object)) {
-// return false;
-// }
-//
-// HBaseRawTap hBaseTap = (HBaseRawTap) object;
-//
-// if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) {
-// return false;
-// }
-//
-// if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) {
-// return false;
-// }
-//
-// return true;
-// }
-//
-// @Override
-// public int hashCode() {
-// int result = super.hashCode();
-// result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0);
-// return result;
-// }
-//}
+/*
+* Copyright (c) 2009 Concurrent, Inc.
+*
+* This work has been released into the public domain
+* by the copyright holder. This applies worldwide.
+*
+* In case this is not legally possible:
+* The copyright holder grants any entity the right
+* to use this work for any purpose, without any
+* conditions, unless such conditions are required by law.
+*/
+
+package parallelai.spyglass.hbase;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cascading.flow.FlowProcess;
+import cascading.tap.SinkMode;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
+import cascading.tuple.TupleEntryCollector;
+import cascading.tuple.TupleEntryIterator;
+
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+
+/**
+* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with
+* the {@HBaseRawScheme} to allow for the reading and writing
+* of data to and from a HBase cluster.
+*/
+@SuppressWarnings({ "deprecation", "rawtypes" })
+public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 8019189493428493323L;
+
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ /** Field SCHEME */
+ public static final String SCHEME = "hbase";
+
+ /** Field hBaseAdmin */
+ private transient HBaseAdmin hBaseAdmin;
+
+ /** Field hostName */
+ private String quorumNames;
+ /** Field tableName */
+ private String tableName;
+ private String base64Scan;
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) {
+ super(HBaseFullScheme, SinkMode.UPDATE);
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) {
+ super(HBaseFullScheme, SinkMode.UPDATE);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param quorumNames HBase quorum
+ * @param tableName The name of the HBase table to read
+ * @param HBaseFullScheme
+ * @param base64Scan An optional base64 encoded scan object
+ * @param sinkMode If REPLACE the output table will be deleted before writing to
+ */
+ public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) {
+ super(HBaseFullScheme, sinkMode);
+ this.quorumNames = quorumNames;
+ this.tableName = tableName;
+ this.base64Scan = base64Scan;
+ }
+
+ /**
+ * Method getTableName returns the tableName of this HBaseTap object.
+ *
+ * @return the tableName (type String) of this HBaseTap object.
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Path getPath() {
+ return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
+ }
+
+ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
+ if (hBaseAdmin == null) {
+ Configuration hbaseConf = HBaseConfiguration.create(conf);
+ hBaseAdmin = new HBaseAdmin(hbaseConf);
+ }
+
+ return hBaseAdmin;
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
+ if (quorumNames != null) {
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+ }
+
+ LOG.debug("sinking to table: {}", tableName);
+
+ if (isReplace() && conf.get("mapred.task.partition") == null) {
+ try {
+ deleteResource(conf);
+
+ } catch (IOException e) {
+ throw new RuntimeException("could not delete resource: " + e);
+ }
+ }
+
+ else if (isUpdate() || isReplace()) {
+ try {
+ createResource(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(tableName + " does not exist !", e);
+ }
+
+ }
+
+ conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ super.sinkConfInit(process, conf);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader)
+ throws IOException {
+ return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
+ }
+
+ @Override
+ public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector)
+ throws IOException {
+ HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this);
+ hBaseCollector.prepare();
+ return hBaseCollector;
+ }
+
+ @Override
+ public boolean createResource(JobConf jobConf) throws IOException {
+ HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);
+
+ if (hBaseAdmin.tableExists(tableName)) {
+ return true;
+ }
+
+ LOG.info("creating hbase table: {}", tableName);
+
+ HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
+
+ String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames();
+
+ for (String familyName : familyNames) {
+ tableDescriptor.addFamily(new HColumnDescriptor(familyName));
+ }
+
+ hBaseAdmin.createTable(tableDescriptor);
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(JobConf jobConf) throws IOException {
+ if (getHBaseAdmin(jobConf).tableExists(tableName)) {
+ if (getHBaseAdmin(jobConf).isTableEnabled(tableName))
+ getHBaseAdmin(jobConf).disableTable(tableName);
+ getHBaseAdmin(jobConf).deleteTable(tableName);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean resourceExists(JobConf jobConf) throws IOException {
+ return getHBaseAdmin(jobConf).tableExists(tableName);
+ }
+
+ @Override
+ public long getModifiedTime(JobConf jobConf) throws IOException {
+ return System.currentTimeMillis(); // currently unable to find last mod
+ // time
+ // on a table
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
+ // a hack for MultiInputFormat to see that there is a child format
+ FileInputFormat.setInputPaths(conf, getPath());
+
+ if (quorumNames != null) {
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+ }
+
+ LOG.debug("sourcing from table: {}", tableName);
+ conf.set(TableInputFormat.INPUT_TABLE, tableName);
+ if (null != base64Scan)
+ conf.set(TableInputFormat.SCAN, base64Scan);
+
+ super.sourceConfInit(process, conf);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ HBaseRawTap hBaseTap = (HBaseRawTap) object;
+
+ if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) {
+ return false;
+ }
+
+ if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
index 1166970..3f10a04 100644
--- a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
@@ -29,6 +29,14 @@
package parallelai.spyglass.jdbc.db;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -39,353 +47,385 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
-import com.jcraft.jsch.Logger;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
/**
- * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts
- * &lt;key,value&gt; pairs, where key has a type extending DBWritable. Returned {@link RecordWriter}
- * writes <b>only the key</b> to the database with a batch SQL query.
+ * A OutputFormat that sends the reduce output to a SQL table.
+ * <p/>
+ * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where key has a type
+ * extending DBWritable. Returned {@link RecordWriter} writes <b>only the
+ * key</b> to the database with a batch SQL query.
*/
-public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> {
- private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
-
- /** A RecordWriter that writes the reduce output to a SQL table */
- protected class DBRecordWriter implements RecordWriter<K, V> {
- private Connection connection;
- private PreparedStatement insertStatement;
- private PreparedStatement updateStatement;
- private final int statementsBeforeExecute;
-
- private long statementsAdded = 0;
- private long insertStatementsCurrent = 0;
- private long updateStatementsCurrent = 0;
-
- protected DBRecordWriter(Connection connection, PreparedStatement insertStatement,
- PreparedStatement updateStatement, int statementsBeforeExecute) {
- this.connection = connection;
- this.insertStatement = insertStatement;
- this.updateStatement = updateStatement;
- this.statementsBeforeExecute = statementsBeforeExecute;
- }
-
- /** {@inheritDoc} */
- public void close(Reporter reporter) throws IOException {
- executeBatch();
-
- try {
- if (insertStatement != null) { insertStatement.close(); }
-
- if (updateStatement != null) { updateStatement.close(); }
-
- connection.commit();
- } catch (SQLException exception) {
- rollBack();
-
- createThrowMessage("unable to commit batch", 0, exception);
- } finally {
- try {
- connection.close();
- } catch (SQLException exception) {
- throw new IOException("unable to close connection", exception);
- }
- }
- }
-
- private void executeBatch() throws IOException {
- try {
- if (insertStatementsCurrent != 0) {
- LOG.info(
- "executing insert batch " + createBatchMessage(insertStatementsCurrent));
-
- insertStatement.executeBatch();
- }
-
- insertStatementsCurrent = 0;
- } catch (SQLException exception) {
- rollBack();
-
- createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception);
- }
-
- try {
- if (updateStatementsCurrent != 0) {
- LOG.info(
- "executing update batch " + createBatchMessage(updateStatementsCurrent));
-
- int[] result = updateStatement.executeBatch();
-
- int count = 0;
-
- for (int value : result) { count += value; }
-
- if (count != updateStatementsCurrent) {
- throw new IOException(
- "update did not update same number of statements executed in batch, batch: "
- + updateStatementsCurrent + " updated: " + count);
- }
- }
-
- updateStatementsCurrent = 0;
- } catch (SQLException exception) {
-
- String message = exception.getMessage();
- if (message.indexOf("Duplicate Key") >= 0) {
- LOG.warn("In exception block. Bypass exception becuase of Insert/Update.");
- } else {
- rollBack();
-
- createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception);
- }
- }
- }
-
- private void rollBack() {
- try {
- connection.rollback();
- } catch (SQLException sqlException) {
- LOG.warn(StringUtils.stringifyException(sqlException));
- }
- }
-
- private String createBatchMessage(long currentStatements) {
- return String
- .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute);
- }
-
- private void createThrowMessage(String stateMessage, long currentStatements,
- SQLException exception) throws IOException {
- String message = exception.getMessage();
-
- message = message.substring(0, Math.min(75, message.length()));
-
- int messageLength = exception.getMessage().length();
- String batchMessage = createBatchMessage(currentStatements);
- String template = "%s [msglength: %d]%s %s";
- String errorMessage =
- String.format(template, stateMessage, messageLength, batchMessage, message);
-
- LOG.error(errorMessage, exception.getNextException());
-
- throw new IOException(errorMessage, exception.getNextException());
- }
-
- /** {@inheritDoc} */
- public synchronized void write(K key, V value) throws IOException {
- try {
- if (value == null) {
- key.write(insertStatement);
- insertStatement.addBatch();
- insertStatementsCurrent++;
- } else {
- key.write(updateStatement);
- updateStatement.addBatch();
- updateStatementsCurrent++;
- }
- } catch (SQLException exception) {
- throw new IOException("unable to add batch statement", exception);
- }
-
- statementsAdded++;
-
- if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); }
- }
- }
-
- /**
- * Constructs the query used as the prepared statement to insert data.
- *
- * @param table the table to insert into
- * @param fieldNames the fields to insert into. If field names are unknown, supply an array of
- * nulls.
- */
- protected String constructInsertQuery(String table, String[] fieldNames) {
- if (fieldNames == null) {
- throw new IllegalArgumentException("Field names may not be null");
- }
-
- StringBuilder query = new StringBuilder();
-
- query.append("INSERT INTO ").append(table);
-
- if (fieldNames.length > 0 && fieldNames[0] != null) {
- query.append(" (");
-
- for (int i = 0; i < fieldNames.length; i++) {
- query.append(fieldNames[i]);
-
- if (i != fieldNames.length - 1) { query.append(","); }
- }
-
- query.append(")");
-
- }
-
- query.append(" VALUES (");
-
- for (int i = 0; i < fieldNames.length; i++) {
- query.append("?");
-
- if (i != fieldNames.length - 1) { query.append(","); }
- }
-
- query.append(")");
-
- boolean test = true;
- if (test) {
- query.append(" ON DUPLICATE KEY UPDATE ");
-
-
- for (int i = 1; i < fieldNames.length; i++) {
-
-
- if ( (i != 1) ) { query.append(","); }
- //if (i != fieldNames.length - 1) { query.append(","); }
- //&& (i != fieldNames.length - 1)
- query.append(fieldNames[i]);
- query.append(" = ?");
-
-
- }
- }
-
- query.append(";");
-
- LOG.info(" ===================== " + query.toString());
- return query.toString();
- }
-
- protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) {
- if (fieldNames == null) {
- throw new IllegalArgumentException("field names may not be null");
- }
-
- Set<String> updateNamesSet = new HashSet<String>();
- Collections.addAll(updateNamesSet, updateNames);
-
- StringBuilder query = new StringBuilder();
-
- query.append("UPDATE ").append(table);
-
- query.append(" SET ");
-
- if (fieldNames.length > 0 && fieldNames[0] != null) {
- int count = 0;
-
- for (int i = 0; i < fieldNames.length; i++) {
- if (updateNamesSet.contains(fieldNames[i])) { continue; }
-
- if (count != 0) { query.append(","); }
-
- query.append(fieldNames[i]);
- query.append(" = ?");
-
- count++;
- }
- }
-
- query.append(" WHERE ");
-
- if (updateNames.length > 0 && updateNames[0] != null) {
- for (int i = 0; i < updateNames.length; i++) {
- query.append(updateNames[i]);
- query.append(" = ?");
-
- if (i != updateNames.length - 1) { query.append(" and "); }
- }
- }
-
- query.append(";");
- System.out.println("Update Query => " + query.toString());
- return query.toString();
- }
-
- /** {@inheritDoc} */
- public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException {
- }
-
- /** {@inheritDoc} */
- public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name,
- Progressable progress) throws IOException {
- DBConfiguration dbConf = new DBConfiguration(job);
-
- String tableName = dbConf.getOutputTableName();
- String[] fieldNames = dbConf.getOutputFieldNames();
- String[] updateNames = dbConf.getOutputUpdateFieldNames();
- int batchStatements = dbConf.getBatchStatementsNum();
-
- Connection connection = dbConf.getConnection();
-
- configureConnection(connection);
-
- String sqlInsert = constructInsertQuery(tableName, fieldNames);
- PreparedStatement insertPreparedStatement;
-
- try {
- insertPreparedStatement = connection.prepareStatement(sqlInsert);
- insertPreparedStatement.setEscapeProcessing(true); // should be on by default
- } catch (SQLException exception) {
- throw new IOException("unable to create statement for: " + sqlInsert, exception);
- }
-
- String sqlUpdate =
- updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null;
- PreparedStatement updatePreparedStatement = null;
-
- try {
- updatePreparedStatement =
- sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null;
- } catch (SQLException exception) {
- throw new IOException("unable to create statement for: " + sqlUpdate, exception);
- }
-
- return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements);
- }
-
- protected void configureConnection(Connection connection) {
- setAutoCommit(connection);
- }
-
- protected void setAutoCommit(Connection connection) {
- try {
- connection.setAutoCommit(false);
- } catch (Exception exception) {
- throw new RuntimeException("unable to set auto commit", exception);
- }
- }
-
- /**
- * Initializes the reduce-part of the job with the appropriate output settings
- *
- * @param job The job
- * @param dbOutputFormatClass
- * @param tableName The table to insert data into
- * @param fieldNames The field names in the table. If unknown, supply the appropriate
- */
- public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass,
- String tableName, String[] fieldNames, String[] updateFields, int batchSize) {
- if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else {
- job.setOutputFormat(dbOutputFormatClass);
- }
-
- // writing doesn't always happen in reduce
- job.setReduceSpeculativeExecution(false);
- job.setMapSpeculativeExecution(false);
-
- DBConfiguration dbConf = new DBConfiguration(job);
-
- dbConf.setOutputTableName(tableName);
- dbConf.setOutputFieldNames(fieldNames);
-
- if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); }
-
- if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); }
- }
+public class DBOutputFormat<K extends DBWritable, V> implements
+ OutputFormat<K, V> {
+ private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+
+ /** A RecordWriter that writes the reduce output to a SQL table */
+ protected class DBRecordWriter implements RecordWriter<K, V> {
+ private Connection connection;
+ private PreparedStatement insertStatement;
+ private PreparedStatement updateStatement;
+ private final int statementsBeforeExecute;
+
+ private long statementsAdded = 0;
+ private long insertStatementsCurrent = 0;
+ private long updateStatementsCurrent = 0;
+
+ protected DBRecordWriter(Connection connection,
+ PreparedStatement insertStatement,
+ PreparedStatement updateStatement, int statementsBeforeExecute) {
+ this.connection = connection;
+ this.insertStatement = insertStatement;
+ this.updateStatement = updateStatement;
+ this.statementsBeforeExecute = statementsBeforeExecute;
+ }
+
+ /** {@inheritDoc} */
+ public void close(Reporter reporter) throws IOException {
+ executeBatch();
+
+ try {
+ if (insertStatement != null) {
+ insertStatement.close();
+ }
+
+ if (updateStatement != null) {
+ updateStatement.close();
+ }
+
+ connection.commit();
+ } catch (SQLException exception) {
+ rollBack();
+
+ createThrowMessage("unable to commit batch", 0, exception);
+ } finally {
+ try {
+ connection.close();
+ } catch (SQLException exception) {
+ throw new IOException("unable to close connection", exception);
+ }
+ }
+ }
+
+ private void executeBatch() throws IOException {
+ try {
+ if (insertStatementsCurrent != 0) {
+ LOG.info("executing insert batch "
+ + createBatchMessage(insertStatementsCurrent));
+
+ insertStatement.executeBatch();
+ }
+
+ insertStatementsCurrent = 0;
+ } catch (SQLException exception) {
+ rollBack();
+
+ createThrowMessage("unable to execute insert batch",
+ insertStatementsCurrent, exception);
+ }
+
+ try {
+ if (updateStatementsCurrent != 0) {
+ LOG.info("executing update batch "
+ + createBatchMessage(updateStatementsCurrent));
+
+ int[] result = updateStatement.executeBatch();
+
+ int count = 0;
+
+ for (int value : result) {
+ count += value;
+ }
+
+ if (count != updateStatementsCurrent) {
+ throw new IOException(
+ "update did not update same number of statements executed in batch, batch: "
+ + updateStatementsCurrent + " updated: " + count);
+ }
+ }
+
+ updateStatementsCurrent = 0;
+ } catch (SQLException exception) {
+
+ String message = exception.getMessage();
+ if (message.indexOf("Duplicate Key") >= 0) {
+ LOG.warn("In exception block. Bypass exception becuase of Insert/Update.");
+ } else {
+ rollBack();
+
+ createThrowMessage("unable to execute update batch",
+ updateStatementsCurrent, exception);
+ }
+ }
+ }
+
+ private void rollBack() {
+ try {
+ connection.rollback();
+ } catch (SQLException sqlException) {
+ LOG.warn(StringUtils.stringifyException(sqlException));
+ }
+ }
+
+ private String createBatchMessage(long currentStatements) {
+ return String.format("[totstmts: %d][crntstmts: %d][batch: %d]",
+ statementsAdded, currentStatements, statementsBeforeExecute);
+ }
+
+ private void createThrowMessage(String stateMessage,
+ long currentStatements, SQLException exception) throws IOException {
+ String message = exception.getMessage();
+
+ // message = message.substring(0, Math.min(75, message.length()));
+
+ int messageLength = exception.getMessage().length();
+ String batchMessage = createBatchMessage(currentStatements);
+ String template = "%s [msglength: %d]%s %s";
+ String errorMessage = String.format(template, stateMessage,
+ messageLength, batchMessage, message);
+
+ LOG.error(errorMessage, exception.getNextException());
+
+ throw new IOException(errorMessage, exception.getNextException());
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void write(K key, V value) throws IOException {
+ try {
+ if (value == null) {
+ key.write(insertStatement);
+ insertStatement.addBatch();
+ insertStatementsCurrent++;
+ } else {
+ key.write(updateStatement);
+ updateStatement.addBatch();
+ updateStatementsCurrent++;
+ }
+ } catch (SQLException exception) {
+ throw new IOException("unable to add batch statement", exception);
+ }
+
+ statementsAdded++;
+
+ if (statementsAdded % statementsBeforeExecute == 0) {
+ executeBatch();
+ }
+ }
+ }
+
+ /**
+ * Constructs the query used as the prepared statement to insert data.
+ *
+ * @param table
+ * the table to insert into
+ * @param fieldNames
+ * the fields to insert into. If field names are unknown, supply an
+ * array of nulls.
+ */
+ protected String constructInsertQuery(String table, String[] fieldNames) {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("Field names may not be null");
+ }
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("INSERT INTO ").append(table);
+
+ if (fieldNames.length > 0 && fieldNames[0] != null) {
+ query.append(" (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+
+ if (i != fieldNames.length - 1) {
+ query.append(",");
+ }
+ }
+
+ query.append(")");
+
+ }
+
+ query.append(" VALUES (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append("?");
+
+ if (i != fieldNames.length - 1) {
+ query.append(",");
+ }
+ }
+
+ query.append(")");
+
+ boolean test = true;
+ if (test) {
+ query.append(" ON DUPLICATE KEY UPDATE ");
+
+ for (int i = 1; i < fieldNames.length; i++) {
+
+ if ((i != 1)) {
+ query.append(",");
+ }
+ // if (i != fieldNames.length - 1) { query.append(","); }
+ // && (i != fieldNames.length - 1)
+ query.append(fieldNames[i]);
+ query.append(" = ?");
+
+ }
+ }
+
+ query.append(";");
+
+ LOG.info(" ===================== " + query.toString());
+ return query.toString();
+ }
+
+ protected String constructUpdateQuery(String table, String[] fieldNames,
+ String[] updateNames) {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("field names may not be null");
+ }
+
+ Set<String> updateNamesSet = new HashSet<String>();
+ Collections.addAll(updateNamesSet, updateNames);
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("UPDATE ").append(table);
+
+ query.append(" SET ");
+
+ if (fieldNames.length > 0 && fieldNames[0] != null) {
+ int count = 0;
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (updateNamesSet.contains(fieldNames[i])) {
+ continue;
+ }
+
+ if (count != 0) {
+ query.append(",");
+ }
+
+ query.append(fieldNames[i]);
+ query.append(" = ?");
+
+ count++;
+ }
+ }
+
+ query.append(" WHERE ");
+
+ if (updateNames.length > 0 && updateNames[0] != null) {
+ for (int i = 0; i < updateNames.length; i++) {
+ query.append(updateNames[i]);
+ query.append(" = ?");
+
+ if (i != updateNames.length - 1) {
+ query.append(" and ");
+ }
+ }
+ }
+
+ query.append(";");
+ System.out.println("Update Query => " + query.toString());
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(FileSystem filesystem, JobConf job)
+ throws IOException {
+ }
+
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
+ JobConf job, String name, Progressable progress) throws IOException {
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ String tableName = dbConf.getOutputTableName();
+ String[] fieldNames = dbConf.getOutputFieldNames();
+ String[] updateNames = dbConf.getOutputUpdateFieldNames();
+ int batchStatements = dbConf.getBatchStatementsNum();
+
+ Connection connection = dbConf.getConnection();
+
+ configureConnection(connection);
+
+ String sqlInsert = constructInsertQuery(tableName, fieldNames);
+ PreparedStatement insertPreparedStatement;
+
+ try {
+ insertPreparedStatement = connection.prepareStatement(sqlInsert);
+ insertPreparedStatement.setEscapeProcessing(true); // should be on by
+ // default
+ } catch (SQLException exception) {
+ throw new IOException("unable to create statement for: " + sqlInsert,
+ exception);
+ }
+
+ String sqlUpdate = updateNames != null ? constructUpdateQuery(tableName,
+ fieldNames, updateNames) : null;
+ PreparedStatement updatePreparedStatement = null;
+
+ try {
+ updatePreparedStatement = sqlUpdate != null ? connection
+ .prepareStatement(sqlUpdate) : null;
+ } catch (SQLException exception) {
+ throw new IOException("unable to create statement for: " + sqlUpdate,
+ exception);
+ }
+
+ return new DBRecordWriter(connection, insertPreparedStatement,
+ updatePreparedStatement, batchStatements);
+ }
+
+ protected void configureConnection(Connection connection) {
+ setAutoCommit(connection);
+ }
+
+ protected void setAutoCommit(Connection connection) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (Exception exception) {
+ throw new RuntimeException("unable to set auto commit", exception);
+ }
+ }
+
+ /**
+ * Initializes the reduce-part of the job with the appropriate output
+ * settings
+ *
+ * @param job
+ * The job
+ * @param dbOutputFormatClass
+ * @param tableName
+ * The table to insert data into
+ * @param fieldNames
+ * The field names in the table. If unknown, supply the appropriate
+ */
+ public static void setOutput(JobConf job,
+ Class<? extends DBOutputFormat> dbOutputFormatClass, String tableName,
+ String[] fieldNames, String[] updateFields, int batchSize) {
+ if (dbOutputFormatClass == null) {
+ job.setOutputFormat(DBOutputFormat.class);
+ } else {
+ job.setOutputFormat(dbOutputFormatClass);
+ }
+
+ // writing doesn't always happen in reduce
+ job.setReduceSpeculativeExecution(false);
+ job.setMapSpeculativeExecution(false);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setOutputTableName(tableName);
+ dbConf.setOutputFieldNames(fieldNames);
+
+ if (updateFields != null) {
+ dbConf.setOutputUpdateFieldNames(updateFields);
+ }
+
+ if (batchSize != -1) {
+ dbConf.setBatchStatementsNum(batchSize);
+ }
+ }
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
index 6216695..450a57d 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
@@ -1,83 +1,83 @@
-//package parallelai.spyglass.hbase
-//
-//import cascading.pipe.Pipe
-//import cascading.pipe.assembly.Coerce
-//import cascading.scheme.Scheme
-//import cascading.tap.{ Tap, SinkMode }
-//import cascading.tuple.Fields
-//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
-//import org.apache.hadoop.hbase.util.Bytes
-//import scala.collection.JavaConversions._
-//import scala.collection.mutable.WrappedArray
-//import com.twitter.scalding._
-//import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-//import org.apache.hadoop.hbase.client.Scan
-//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
-//import org.apache.hadoop.hbase.util.Base64
-//import java.io.ByteArrayOutputStream
-//import java.io.DataOutputStream
-//
-//object HBaseRawSource {
-// /**
-// * Converts a scan object to a base64 string that can be passed to HBaseRawSource
-// * @param scan
-// * @return base64 string representation
-// */
-// def convertScanToString(scan: Scan) = {
-// val out = new ByteArrayOutputStream();
-// val dos = new DataOutputStream(out);
-// scan.write(dos);
-// Base64.encodeBytes(out.toByteArray());
-// }
-//}
-//
-//
-///**
-// * @author Rotem Hermon
-// *
-// * HBaseRawSource is a scalding source that passes the original row (Result) object to the
-// * mapper for customized processing.
-// *
-// * @param tableName The name of the HBase table to read
-// * @param quorumNames HBase quorum
-// * @param familyNames Column families to get (source, if null will get all) or update to (sink)
-// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
-// * @param base64Scan An optional base64 encoded scan object
-// * @param sinkMode If REPLACE the output table will be deleted before writing to
-// *
-// */
-//class HBaseRawSource(
-// tableName: String,
-// quorumNames: String = "localhost",
-// familyNames: Array[String],
-// writeNulls: Boolean = true,
-// base64Scan: String = null,
-// sinkMode: SinkMode = null) extends Source {
-//
-// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
-// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
-//
-// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
-// val hBaseScheme = hdfsScheme match {
-// case hbase: HBaseRawScheme => hbase
-// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
-// }
-// mode match {
-// case hdfsMode @ Hdfs(_, _) => readOrWrite match {
-// case Read => {
-// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
-// case null => SinkMode.KEEP
-// case _ => sinkMode
-// }).asInstanceOf[Tap[_, _, _]]
-// }
-// case Write => {
-// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
-// case null => SinkMode.UPDATE
-// case _ => sinkMode
-// }).asInstanceOf[Tap[_, _, _]]
-// }
-// }
-// case _ => super.createTap(readOrWrite)(mode)
-// }
-// }
-//}
+package parallelai.spyglass.hbase
+
+import cascading.pipe.Pipe
+import cascading.pipe.assembly.Coerce
+import cascading.scheme.Scheme
+import cascading.tap.{ Tap, SinkMode }
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
+import org.apache.hadoop.hbase.util.Bytes
+import scala.collection.JavaConversions._
+import scala.collection.mutable.WrappedArray
+import com.twitter.scalding._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+import org.apache.hadoop.hbase.util.Base64
+import java.io.ByteArrayOutputStream
+import java.io.DataOutputStream
+
+object HBaseRawSource {
+ /**
+ * Converts a scan object to a base64 string that can be passed to HBaseRawSource
+ * @param scan
+ * @return base64 string representation
+ */
+ def convertScanToString(scan: Scan) = {
+ val out = new ByteArrayOutputStream();
+ val dos = new DataOutputStream(out);
+ scan.write(dos);
+ Base64.encodeBytes(out.toByteArray());
+ }
+}
+
+
+/**
+* @author Rotem Hermon
+*
+* HBaseRawSource is a scalding source that passes the original row (Result) object to the
+* mapper for customized processing.
+*
+* @param tableName The name of the HBase table to read
+* @param quorumNames HBase quorum
+* @param familyNames Column families to get (source, if null will get all) or update to (sink)
+* @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
+* @param base64Scan An optional base64 encoded scan object
+* @param sinkMode If REPLACE the output table will be deleted before writing to
+*
+*/
+class HBaseRawSource(
+ tableName: String,
+ quorumNames: String = "localhost",
+ familyNames: Array[String],
+ writeNulls: Boolean = true,
+ base64Scan: String = null,
+ sinkMode: SinkMode = null) extends Source {
+
+ override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val hBaseScheme = hdfsScheme match {
+ case hbase: HBaseRawScheme => hbase
+ case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+ case null => SinkMode.KEEP
+ case _ => sinkMode
+ }).asInstanceOf[Tap[_, _, _]]
+ }
+ case Write => {
+ new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
+ case null => SinkMode.UPDATE
+ case _ => sinkMode
+ }).asInstanceOf[Tap[_, _, _]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}