diff options
Diffstat (limited to 'src')
4 files changed, 1067 insertions, 1027 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 7dba40d..7b62c88 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -1,286 +1,286 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.Arrays; -//import java.util.HashSet; -// -//import org.apache.hadoop.hbase.client.Put; -//import org.apache.hadoop.hbase.client.Result; -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -//import org.apache.hadoop.hbase.mapred.TableOutputFormat; -//import org.apache.hadoop.hbase.util.Bytes; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; -//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; -// -//import cascading.flow.FlowProcess; -//import cascading.scheme.Scheme; -//import cascading.scheme.SinkCall; -//import cascading.scheme.SourceCall; -//import cascading.tap.Tap; -//import cascading.tuple.Fields; -//import cascading.tuple.Tuple; -//import cascading.tuple.TupleEntry; -//import cascading.util.Util; -// -///** -//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction -//* with the {@HBaseRawTap} to allow for the reading and writing of data -//* to and from a HBase cluster. -//* -//* @see HBaseRawTap -//*/ -//@SuppressWarnings({ "rawtypes", "deprecation" }) -//public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { -// /** -// * -// */ -// private static final long serialVersionUID = 6248976486883281356L; -// -// /** Field LOG */ -// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); -// -// public final Fields RowKeyField = new Fields("rowkey"); -// public final Fields RowField = new Fields("row"); -// -// /** String familyNames */ -// private String[] familyNames; -// -// private boolean writeNulls = true; -// -// /** -// * Constructor HBaseScheme creates a new HBaseScheme instance. -// * -// * @param keyFields -// * of type Fields -// * @param familyName -// * of type String -// * @param valueFields -// * of type Fields -// */ -// public HBaseRawScheme(String familyName) { -// this(new String[] { familyName }); -// } -// -// public HBaseRawScheme(String[] familyNames) { -// this.familyNames = familyNames; -// setSourceFields(); -// } -// -// public HBaseRawScheme(String familyName, boolean writeNulls) { -// this(new String[] { familyName }, writeNulls); -// } -// -// public HBaseRawScheme(String[] familyNames, boolean writeNulls) { -// this.familyNames = familyNames; -// this.writeNulls = writeNulls; -// setSourceFields(); -// } -// -// private void setSourceFields() { -// Fields sourceFields = Fields.join(RowKeyField, RowField); -// setSourceFields(sourceFields); -// } -// -// /** -// * Method getFamilyNames returns the set of familyNames of this HBaseScheme -// * object. -// * -// * @return the familyNames (type String[]) of this HBaseScheme object. -// */ -// public String[] getFamilyNames() { -// HashSet<String> familyNameSet = new HashSet<String>(); -// if (familyNames != null) { -// for (String familyName : familyNames) { -// familyNameSet.add(familyName); -// } -// } -// return familyNameSet.toArray(new String[0]); -// } -// -// @Override -// public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -// Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; -// -// sourceCall.setContext(pair); -// } -// -// @Override -// public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { -// sourceCall.setContext(null); -// } -// -// @SuppressWarnings("unchecked") -// @Override -// public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) -// throws IOException { -// Tuple result = new Tuple(); -// -// Object key = sourceCall.getContext()[0]; -// Object value = sourceCall.getContext()[1]; -// boolean hasNext = sourceCall.getInput().next(key, value); -// if (!hasNext) { -// return false; -// } -// -// // Skip nulls -// if (key == null || value == null) { -// return true; -// } -// -// ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; -// Result row = (Result) value; -// result.add(keyWritable); -// result.add(row); -// sourceCall.getIncomingEntry().setTuple(result); -// return true; -// } -// -// @SuppressWarnings("unchecked") -// @Override -// public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { -// TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); -// OutputCollector outputCollector = sinkCall.getOutput(); -// Tuple key = tupleEntry.selectTuple(RowKeyField); -// Object okey = key.getObject(0); -// ImmutableBytesWritable keyBytes = getBytes(okey); -// Put put = new Put(keyBytes.get()); -// Fields outFields = tupleEntry.getFields().subtract(RowKeyField); -// if (null != outFields) { -// TupleEntry values = tupleEntry.selectEntry(outFields); -// for (int n = 0; n < values.getFields().size(); n++) { -// Object o = values.get(n); -// ImmutableBytesWritable valueBytes = getBytes(o); -// Comparable field = outFields.get(n); -// ColumnName cn = parseColumn((String) field); -// if (null == cn.family) { -// if (n >= familyNames.length) -// cn.family = familyNames[familyNames.length - 1]; -// else -// cn.family = familyNames[n]; -// } -// if (null != o || writeNulls) -// put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); -// } -// } -// outputCollector.collect(null, put); -// } -// -// private ImmutableBytesWritable getBytes(Object obj) { -// if (null == obj) -// return new ImmutableBytesWritable(new byte[0]); -// if (obj instanceof ImmutableBytesWritable) -// return (ImmutableBytesWritable) obj; -// else if (obj instanceof String) -// return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); -// else if (obj instanceof Long) -// return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); -// else if (obj instanceof Integer) -// return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); -// else if (obj instanceof Short) -// return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); -// else if (obj instanceof Boolean) -// return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); -// else if (obj instanceof Double) -// return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); -// else -// throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" -// + obj.getClass().getName()); -// } -// -// private ColumnName parseColumn(String column) { -// ColumnName ret = new ColumnName(); -// int pos = column.indexOf(":"); -// if (pos > 0) { -// ret.name = column.substring(pos + 1); -// ret.family = column.substring(0, pos); -// } else { -// ret.name = column; -// } -// return ret; -// } -// -// private class ColumnName { -// String family; -// String name; -// -// ColumnName() { -// } -// } -// -// @Override -// public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { -// conf.setOutputFormat(TableOutputFormat.class); -// conf.setOutputKeyClass(ImmutableBytesWritable.class); -// conf.setOutputValueClass(Put.class); -// } -// -// @Override -// public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, -// JobConf conf) { -// DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, -// ValueCopier.class); -// if (null != familyNames) { -// String columns = Util.join(this.familyNames, " "); -// LOG.debug("sourcing from column families: {}", columns); -// conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); -// } -// } -// -// @Override -// public boolean equals(Object object) { -// if (this == object) { -// return true; -// } -// if (object == null || getClass() != object.getClass()) { -// return false; -// } -// if (!super.equals(object)) { -// return false; -// } -// -// HBaseRawScheme that = (HBaseRawScheme) object; -// -// if (!Arrays.equals(familyNames, that.familyNames)) { -// return false; -// } -// return true; -// } -// -// @Override -// public int hashCode() { -// int result = super.hashCode(); -// result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); -// return result; -// } -// -// public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { -// -// public ValueCopier() { -// } -// -// public void copyValue(Result oldValue, Result newValue) { -// if (null != oldValue && null != newValue) { -// oldValue.copyFrom(newValue); -// } -// } -// -// } -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; + +/** +* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction +* with the {@HBaseRawTap} to allow for the reading and writing of data +* to and from a HBase cluster. +* +* @see HBaseRawTap +*/ +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { + /** + * + */ + private static final long serialVersionUID = 6248976486883281356L; + + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); + + public final Fields RowKeyField = new Fields("rowkey"); + public final Fields RowField = new Fields("row"); + + /** String familyNames */ + private String[] familyNames; + + private boolean writeNulls = true; + + /** + * Constructor HBaseScheme creates a new HBaseScheme instance. + * + * @param keyFields + * of type Fields + * @param familyName + * of type String + * @param valueFields + * of type Fields + */ + public HBaseRawScheme(String familyName) { + this(new String[] { familyName }); + } + + public HBaseRawScheme(String[] familyNames) { + this.familyNames = familyNames; + setSourceFields(); + } + + public HBaseRawScheme(String familyName, boolean writeNulls) { + this(new String[] { familyName }, writeNulls); + } + + public HBaseRawScheme(String[] familyNames, boolean writeNulls) { + this.familyNames = familyNames; + this.writeNulls = writeNulls; + setSourceFields(); + } + + private void setSourceFields() { + Fields sourceFields = Fields.join(RowKeyField, RowField); + setSourceFields(sourceFields); + } + + /** + * Method getFamilyNames returns the set of familyNames of this HBaseScheme + * object. + * + * @return the familyNames (type String[]) of this HBaseScheme object. + */ + public String[] getFamilyNames() { + HashSet<String> familyNameSet = new HashSet<String>(); + if (familyNames != null) { + for (String familyName : familyNames) { + familyNameSet.add(familyName); + } + } + return familyNameSet.toArray(new String[0]); + } + + @Override + public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { + Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; + + sourceCall.setContext(pair); + } + + @Override + public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { + sourceCall.setContext(null); + } + + @SuppressWarnings("unchecked") + @Override + public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) + throws IOException { + Tuple result = new Tuple(); + + Object key = sourceCall.getContext()[0]; + Object value = sourceCall.getContext()[1]; + boolean hasNext = sourceCall.getInput().next(key, value); + if (!hasNext) { + return false; + } + + // Skip nulls + if (key == null || value == null) { + return true; + } + + ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; + Result row = (Result) value; + result.add(keyWritable); + result.add(row); + sourceCall.getIncomingEntry().setTuple(result); + return true; + } + + @SuppressWarnings("unchecked") + @Override + public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { + TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); + OutputCollector outputCollector = sinkCall.getOutput(); + Tuple key = tupleEntry.selectTuple(RowKeyField); + Object okey = key.getObject(0); + ImmutableBytesWritable keyBytes = getBytes(okey); + Put put = new Put(keyBytes.get()); + Fields outFields = tupleEntry.getFields().subtract(RowKeyField); + if (null != outFields) { + TupleEntry values = tupleEntry.selectEntry(outFields); + for (int n = 0; n < values.getFields().size(); n++) { + Object o = values.get(n); + ImmutableBytesWritable valueBytes = getBytes(o); + Comparable field = outFields.get(n); + ColumnName cn = parseColumn((String) field); + if (null == cn.family) { + if (n >= familyNames.length) + cn.family = familyNames[familyNames.length - 1]; + else + cn.family = familyNames[n]; + } + if (null != o || writeNulls) + put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); + } + } + outputCollector.collect(null, put); + } + + private ImmutableBytesWritable getBytes(Object obj) { + if (null == obj) + return new ImmutableBytesWritable(new byte[0]); + if (obj instanceof ImmutableBytesWritable) + return (ImmutableBytesWritable) obj; + else if (obj instanceof String) + return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); + else if (obj instanceof Long) + return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); + else if (obj instanceof Integer) + return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); + else if (obj instanceof Short) + return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); + else if (obj instanceof Boolean) + return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); + else if (obj instanceof Double) + return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); + else + throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" + + obj.getClass().getName()); + } + + private ColumnName parseColumn(String column) { + ColumnName ret = new ColumnName(); + int pos = column.indexOf(":"); + if (pos > 0) { + ret.name = column.substring(pos + 1); + ret.family = column.substring(0, pos); + } else { + ret.name = column; + } + return ret; + } + + private class ColumnName { + String family; + String name; + + ColumnName() { + } + } + + @Override + public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { + conf.setOutputFormat(TableOutputFormat.class); + conf.setOutputKeyClass(ImmutableBytesWritable.class); + conf.setOutputValueClass(Put.class); + } + + @Override + public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, + JobConf conf) { + DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, + ValueCopier.class); + if (null != familyNames) { + String columns = Util.join(this.familyNames, " "); + LOG.debug("sourcing from column families: {}", columns); + conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); + } + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + + HBaseRawScheme that = (HBaseRawScheme) object; + + if (!Arrays.equals(familyNames, that.familyNames)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); + return result; + } + + public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> { + + public ValueCopier() { + } + + public void copyValue(Result oldValue, Result newValue) { + if (null != oldValue && null != newValue) { + oldValue.copyFrom(newValue); + } + } + + } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 780d3fc..5dcd57d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -///* -//* Copyright (c) 2009 Concurrent, Inc. -//* -//* This work has been released into the public domain -//* by the copyright holder. This applies worldwide. -//* -//* In case this is not legally possible: -//* The copyright holder grants any entity the right -//* to use this work for any purpose, without any -//* conditions, unless such conditions are required by law. -//*/ -// -//package parallelai.spyglass.hbase; -// -//import java.io.IOException; -//import java.util.UUID; -// -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.hbase.HBaseConfiguration; -//import org.apache.hadoop.hbase.HColumnDescriptor; -//import org.apache.hadoop.hbase.HTableDescriptor; -//import org.apache.hadoop.hbase.MasterNotRunningException; -//import org.apache.hadoop.hbase.ZooKeeperConnectionException; -//import org.apache.hadoop.hbase.client.HBaseAdmin; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -//import org.apache.hadoop.mapred.FileInputFormat; -//import org.apache.hadoop.mapred.JobConf; -//import org.apache.hadoop.mapred.OutputCollector; -//import org.apache.hadoop.mapred.RecordReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import cascading.flow.FlowProcess; -//import cascading.tap.SinkMode; -//import cascading.tap.Tap; -//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -//import cascading.tuple.TupleEntryCollector; -//import cascading.tuple.TupleEntryIterator; -// -//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -// -///** -//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with -//* the {@HBaseRawScheme} to allow for the reading and writing -//* of data to and from a HBase cluster. -//*/ -//@SuppressWarnings({ "deprecation", "rawtypes" }) -//public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { -// /** -// * -// */ -// private static final long serialVersionUID = 8019189493428493323L; -// -// /** Field LOG */ -// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); -// -// private final String id = UUID.randomUUID().toString(); -// -// /** Field SCHEME */ -// public static final String SCHEME = "hbase"; -// -// /** Field hBaseAdmin */ -// private transient HBaseAdmin hBaseAdmin; -// -// /** Field hostName */ -// private String quorumNames; -// /** Field tableName */ -// private String tableName; -// private String base64Scan; -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// */ -// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { -// super(HBaseFullScheme, SinkMode.UPDATE); -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// * @param sinkMode -// * of type SinkMode -// */ -// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -// super(HBaseFullScheme, sinkMode); -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// */ -// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { -// super(HBaseFullScheme, SinkMode.UPDATE); -// this.quorumNames = quorumNames; -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param tableName -// * of type String -// * @param HBaseFullScheme -// * of type HBaseFullScheme -// * @param sinkMode -// * of type SinkMode -// */ -// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { -// super(HBaseFullScheme, sinkMode); -// this.quorumNames = quorumNames; -// this.tableName = tableName; -// } -// -// /** -// * Constructor HBaseTap creates a new HBaseTap instance. -// * -// * @param quorumNames HBase quorum -// * @param tableName The name of the HBase table to read -// * @param HBaseFullScheme -// * @param base64Scan An optional base64 encoded scan object -// * @param sinkMode If REPLACE the output table will be deleted before writing to -// */ -// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { -// super(HBaseFullScheme, sinkMode); -// this.quorumNames = quorumNames; -// this.tableName = tableName; -// this.base64Scan = base64Scan; -// } -// -// /** -// * Method getTableName returns the tableName of this HBaseTap object. -// * -// * @return the tableName (type String) of this HBaseTap object. -// */ -// public String getTableName() { -// return tableName; -// } -// -// public Path getPath() { -// return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); -// } -// -// protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { -// if (hBaseAdmin == null) { -// Configuration hbaseConf = HBaseConfiguration.create(conf); -// hBaseAdmin = new HBaseAdmin(hbaseConf); -// } -// -// return hBaseAdmin; -// } -// -// @Override -// public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { -// if (quorumNames != null) { -// conf.set("hbase.zookeeper.quorum", quorumNames); -// } -// -// LOG.debug("sinking to table: {}", tableName); -// -// if (isReplace() && conf.get("mapred.task.partition") == null) { -// try { -// deleteResource(conf); -// -// } catch (IOException e) { -// throw new RuntimeException("could not delete resource: " + e); -// } -// } -// -// else if (isUpdate() || isReplace()) { -// try { -// createResource(conf); -// } catch (IOException e) { -// throw new RuntimeException(tableName + " does not exist !", e); -// } -// -// } -// -// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); -// super.sinkConfInit(process, conf); -// } -// -// @Override -// public String getIdentifier() { -// return id; -// } -// -// @Override -// public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) -// throws IOException { -// return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); -// } -// -// @Override -// public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) -// throws IOException { -// HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); -// hBaseCollector.prepare(); -// return hBaseCollector; -// } -// -// @Override -// public boolean createResource(JobConf jobConf) throws IOException { -// HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); -// -// if (hBaseAdmin.tableExists(tableName)) { -// return true; -// } -// -// LOG.info("creating hbase table: {}", tableName); -// -// HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); -// -// String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); -// -// for (String familyName : familyNames) { -// tableDescriptor.addFamily(new HColumnDescriptor(familyName)); -// } -// -// hBaseAdmin.createTable(tableDescriptor); -// -// return true; -// } -// -// @Override -// public boolean deleteResource(JobConf jobConf) throws IOException { -// if (getHBaseAdmin(jobConf).tableExists(tableName)) { -// if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) -// getHBaseAdmin(jobConf).disableTable(tableName); -// getHBaseAdmin(jobConf).deleteTable(tableName); -// } -// return true; -// } -// -// @Override -// public boolean resourceExists(JobConf jobConf) throws IOException { -// return getHBaseAdmin(jobConf).tableExists(tableName); -// } -// -// @Override -// public long getModifiedTime(JobConf jobConf) throws IOException { -// return System.currentTimeMillis(); // currently unable to find last mod -// // time -// // on a table -// } -// -// @Override -// public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { -// // a hack for MultiInputFormat to see that there is a child format -// FileInputFormat.setInputPaths(conf, getPath()); -// -// if (quorumNames != null) { -// conf.set("hbase.zookeeper.quorum", quorumNames); -// } -// -// LOG.debug("sourcing from table: {}", tableName); -// conf.set(TableInputFormat.INPUT_TABLE, tableName); -// if (null != base64Scan) -// conf.set(TableInputFormat.SCAN, base64Scan); -// -// super.sourceConfInit(process, conf); -// } -// -// @Override -// public boolean equals(Object object) { -// if (this == object) { -// return true; -// } -// if (object == null || getClass() != object.getClass()) { -// return false; -// } -// if (!super.equals(object)) { -// return false; -// } -// -// HBaseRawTap hBaseTap = (HBaseRawTap) object; -// -// if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { -// return false; -// } -// -// if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { -// return false; -// } -// -// return true; -// } -// -// @Override -// public int hashCode() { -// int result = super.hashCode(); -// result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); -// return result; -// } -//} +/* +* Copyright (c) 2009 Concurrent, Inc. +* +* This work has been released into the public domain +* by the copyright holder. This applies worldwide. +* +* In case this is not legally possible: +* The copyright holder grants any entity the right +* to use this work for any purpose, without any +* conditions, unless such conditions are required by law. +*/ + +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; + +/** +* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +* the {@HBaseRawScheme} to allow for the reading and writing +* of data to and from a HBase cluster. +*/ +@SuppressWarnings({ "deprecation", "rawtypes" }) +public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> { + /** + * + */ + private static final long serialVersionUID = 8019189493428493323L; + + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); + + private final String id = UUID.randomUUID().toString(); + + /** Field SCHEME */ + public static final String SCHEME = "hbase"; + + /** Field hBaseAdmin */ + private transient HBaseAdmin hBaseAdmin; + + /** Field hostName */ + private String quorumNames; + /** Field tableName */ + private String tableName; + private String base64Scan; + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { + super(HBaseFullScheme, SinkMode.UPDATE); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + } + + /** + * Constructor HBaseTap creates a new HBaseTap instance. + * + * @param quorumNames HBase quorum + * @param tableName The name of the HBase table to read + * @param HBaseFullScheme + * @param base64Scan An optional base64 encoded scan object + * @param sinkMode If REPLACE the output table will be deleted before writing to + */ + public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { + super(HBaseFullScheme, sinkMode); + this.quorumNames = quorumNames; + this.tableName = tableName; + this.base64Scan = base64Scan; + } + + /** + * Method getTableName returns the tableName of this HBaseTap object. + * + * @return the tableName (type String) of this HBaseTap object. + */ + public String getTableName() { + return tableName; + } + + public Path getPath() { + return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); + } + + protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { + if (hBaseAdmin == null) { + Configuration hbaseConf = HBaseConfiguration.create(conf); + hBaseAdmin = new HBaseAdmin(hbaseConf); + } + + return hBaseAdmin; + } + + @Override + public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { + if (quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sinking to table: {}", tableName); + + if (isReplace() && conf.get("mapred.task.partition") == null) { + try { + deleteResource(conf); + + } catch (IOException e) { + throw new RuntimeException("could not delete resource: " + e); + } + } + + else if (isUpdate() || isReplace()) { + try { + createResource(conf); + } catch (IOException e) { + throw new RuntimeException(tableName + " does not exist !", e); + } + + } + + conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); + super.sinkConfInit(process, conf); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) + throws IOException { + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + } + + @Override + public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) + throws IOException { + HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); + hBaseCollector.prepare(); + return hBaseCollector; + } + + @Override + public boolean createResource(JobConf jobConf) throws IOException { + HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } + + LOG.info("creating hbase table: {}", tableName); + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + + String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); + + for (String familyName : familyNames) { + tableDescriptor.addFamily(new HColumnDescriptor(familyName)); + } + + hBaseAdmin.createTable(tableDescriptor); + + return true; + } + + @Override + public boolean deleteResource(JobConf jobConf) throws IOException { + if (getHBaseAdmin(jobConf).tableExists(tableName)) { + if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) + getHBaseAdmin(jobConf).disableTable(tableName); + getHBaseAdmin(jobConf).deleteTable(tableName); + } + return true; + } + + @Override + public boolean resourceExists(JobConf jobConf) throws IOException { + return getHBaseAdmin(jobConf).tableExists(tableName); + } + + @Override + public long getModifiedTime(JobConf jobConf) throws IOException { + return System.currentTimeMillis(); // currently unable to find last mod + // time + // on a table + } + + @Override + public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { + // a hack for MultiInputFormat to see that there is a child format + FileInputFormat.setInputPaths(conf, getPath()); + + if (quorumNames != null) { + conf.set("hbase.zookeeper.quorum", quorumNames); + } + + LOG.debug("sourcing from table: {}", tableName); + conf.set(TableInputFormat.INPUT_TABLE, tableName); + if (null != base64Scan) + conf.set(TableInputFormat.SCAN, base64Scan); + + super.sourceConfInit(process, conf); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + + HBaseRawTap hBaseTap = (HBaseRawTap) object; + + if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { + return false; + } + + if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); + return result; + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java index 1166970..3f10a04 100644 --- a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -29,6 +29,14 @@ package parallelai.spyglass.jdbc.db; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -39,353 +47,385 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; -import com.jcraft.jsch.Logger; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - /** - * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts - * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} - * writes <b>only the key</b> to the database with a batch SQL query. + * A OutputFormat that sends the reduce output to a SQL table. + * <p/> + * {@link DBOutputFormat} accepts <key,value> pairs, where key has a type + * extending DBWritable. Returned {@link RecordWriter} writes <b>only the + * key</b> to the database with a batch SQL query. */ -public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { - private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); - - /** A RecordWriter that writes the reduce output to a SQL table */ - protected class DBRecordWriter implements RecordWriter<K, V> { - private Connection connection; - private PreparedStatement insertStatement; - private PreparedStatement updateStatement; - private final int statementsBeforeExecute; - - private long statementsAdded = 0; - private long insertStatementsCurrent = 0; - private long updateStatementsCurrent = 0; - - protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, - PreparedStatement updateStatement, int statementsBeforeExecute) { - this.connection = connection; - this.insertStatement = insertStatement; - this.updateStatement = updateStatement; - this.statementsBeforeExecute = statementsBeforeExecute; - } - - /** {@inheritDoc} */ - public void close(Reporter reporter) throws IOException { - executeBatch(); - - try { - if (insertStatement != null) { insertStatement.close(); } - - if (updateStatement != null) { updateStatement.close(); } - - connection.commit(); - } catch (SQLException exception) { - rollBack(); - - createThrowMessage("unable to commit batch", 0, exception); - } finally { - try { - connection.close(); - } catch (SQLException exception) { - throw new IOException("unable to close connection", exception); - } - } - } - - private void executeBatch() throws IOException { - try { - if (insertStatementsCurrent != 0) { - LOG.info( - "executing insert batch " + createBatchMessage(insertStatementsCurrent)); - - insertStatement.executeBatch(); - } - - insertStatementsCurrent = 0; - } catch (SQLException exception) { - rollBack(); - - createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); - } - - try { - if (updateStatementsCurrent != 0) { - LOG.info( - "executing update batch " + createBatchMessage(updateStatementsCurrent)); - - int[] result = updateStatement.executeBatch(); - - int count = 0; - - for (int value : result) { count += value; } - - if (count != updateStatementsCurrent) { - throw new IOException( - "update did not update same number of statements executed in batch, batch: " - + updateStatementsCurrent + " updated: " + count); - } - } - - updateStatementsCurrent = 0; - } catch (SQLException exception) { - - String message = exception.getMessage(); - if (message.indexOf("Duplicate Key") >= 0) { - LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); - } else { - rollBack(); - - createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); - } - } - } - - private void rollBack() { - try { - connection.rollback(); - } catch (SQLException sqlException) { - LOG.warn(StringUtils.stringifyException(sqlException)); - } - } - - private String createBatchMessage(long currentStatements) { - return String - .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); - } - - private void createThrowMessage(String stateMessage, long currentStatements, - SQLException exception) throws IOException { - String message = exception.getMessage(); - - message = message.substring(0, Math.min(75, message.length())); - - int messageLength = exception.getMessage().length(); - String batchMessage = createBatchMessage(currentStatements); - String template = "%s [msglength: %d]%s %s"; - String errorMessage = - String.format(template, stateMessage, messageLength, batchMessage, message); - - LOG.error(errorMessage, exception.getNextException()); - - throw new IOException(errorMessage, exception.getNextException()); - } - - /** {@inheritDoc} */ - public synchronized void write(K key, V value) throws IOException { - try { - if (value == null) { - key.write(insertStatement); - insertStatement.addBatch(); - insertStatementsCurrent++; - } else { - key.write(updateStatement); - updateStatement.addBatch(); - updateStatementsCurrent++; - } - } catch (SQLException exception) { - throw new IOException("unable to add batch statement", exception); - } - - statementsAdded++; - - if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } - } - } - - /** - * Constructs the query used as the prepared statement to insert data. - * - * @param table the table to insert into - * @param fieldNames the fields to insert into. If field names are unknown, supply an array of - * nulls. - */ - protected String constructInsertQuery(String table, String[] fieldNames) { - if (fieldNames == null) { - throw new IllegalArgumentException("Field names may not be null"); - } - - StringBuilder query = new StringBuilder(); - - query.append("INSERT INTO ").append(table); - - if (fieldNames.length > 0 && fieldNames[0] != null) { - query.append(" ("); - - for (int i = 0; i < fieldNames.length; i++) { - query.append(fieldNames[i]); - - if (i != fieldNames.length - 1) { query.append(","); } - } - - query.append(")"); - - } - - query.append(" VALUES ("); - - for (int i = 0; i < fieldNames.length; i++) { - query.append("?"); - - if (i != fieldNames.length - 1) { query.append(","); } - } - - query.append(")"); - - boolean test = true; - if (test) { - query.append(" ON DUPLICATE KEY UPDATE "); - - - for (int i = 1; i < fieldNames.length; i++) { - - - if ( (i != 1) ) { query.append(","); } - //if (i != fieldNames.length - 1) { query.append(","); } - //&& (i != fieldNames.length - 1) - query.append(fieldNames[i]); - query.append(" = ?"); - - - } - } - - query.append(";"); - - LOG.info(" ===================== " + query.toString()); - return query.toString(); - } - - protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { - if (fieldNames == null) { - throw new IllegalArgumentException("field names may not be null"); - } - - Set<String> updateNamesSet = new HashSet<String>(); - Collections.addAll(updateNamesSet, updateNames); - - StringBuilder query = new StringBuilder(); - - query.append("UPDATE ").append(table); - - query.append(" SET "); - - if (fieldNames.length > 0 && fieldNames[0] != null) { - int count = 0; - - for (int i = 0; i < fieldNames.length; i++) { - if (updateNamesSet.contains(fieldNames[i])) { continue; } - - if (count != 0) { query.append(","); } - - query.append(fieldNames[i]); - query.append(" = ?"); - - count++; - } - } - - query.append(" WHERE "); - - if (updateNames.length > 0 && updateNames[0] != null) { - for (int i = 0; i < updateNames.length; i++) { - query.append(updateNames[i]); - query.append(" = ?"); - - if (i != updateNames.length - 1) { query.append(" and "); } - } - } - - query.append(";"); - System.out.println("Update Query => " + query.toString()); - return query.toString(); - } - - /** {@inheritDoc} */ - public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { - } - - /** {@inheritDoc} */ - public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, - Progressable progress) throws IOException { - DBConfiguration dbConf = new DBConfiguration(job); - - String tableName = dbConf.getOutputTableName(); - String[] fieldNames = dbConf.getOutputFieldNames(); - String[] updateNames = dbConf.getOutputUpdateFieldNames(); - int batchStatements = dbConf.getBatchStatementsNum(); - - Connection connection = dbConf.getConnection(); - - configureConnection(connection); - - String sqlInsert = constructInsertQuery(tableName, fieldNames); - PreparedStatement insertPreparedStatement; - - try { - insertPreparedStatement = connection.prepareStatement(sqlInsert); - insertPreparedStatement.setEscapeProcessing(true); // should be on by default - } catch (SQLException exception) { - throw new IOException("unable to create statement for: " + sqlInsert, exception); - } - - String sqlUpdate = - updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; - PreparedStatement updatePreparedStatement = null; - - try { - updatePreparedStatement = - sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; - } catch (SQLException exception) { - throw new IOException("unable to create statement for: " + sqlUpdate, exception); - } - - return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); - } - - protected void configureConnection(Connection connection) { - setAutoCommit(connection); - } - - protected void setAutoCommit(Connection connection) { - try { - connection.setAutoCommit(false); - } catch (Exception exception) { - throw new RuntimeException("unable to set auto commit", exception); - } - } - - /** - * Initializes the reduce-part of the job with the appropriate output settings - * - * @param job The job - * @param dbOutputFormatClass - * @param tableName The table to insert data into - * @param fieldNames The field names in the table. If unknown, supply the appropriate - */ - public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, - String tableName, String[] fieldNames, String[] updateFields, int batchSize) { - if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { - job.setOutputFormat(dbOutputFormatClass); - } - - // writing doesn't always happen in reduce - job.setReduceSpeculativeExecution(false); - job.setMapSpeculativeExecution(false); - - DBConfiguration dbConf = new DBConfiguration(job); - - dbConf.setOutputTableName(tableName); - dbConf.setOutputFieldNames(fieldNames); - - if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } - - if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } - } +public class DBOutputFormat<K extends DBWritable, V> implements + OutputFormat<K, V> { + private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + + /** A RecordWriter that writes the reduce output to a SQL table */ + protected class DBRecordWriter implements RecordWriter<K, V> { + private Connection connection; + private PreparedStatement insertStatement; + private PreparedStatement updateStatement; + private final int statementsBeforeExecute; + + private long statementsAdded = 0; + private long insertStatementsCurrent = 0; + private long updateStatementsCurrent = 0; + + protected DBRecordWriter(Connection connection, + PreparedStatement insertStatement, + PreparedStatement updateStatement, int statementsBeforeExecute) { + this.connection = connection; + this.insertStatement = insertStatement; + this.updateStatement = updateStatement; + this.statementsBeforeExecute = statementsBeforeExecute; + } + + /** {@inheritDoc} */ + public void close(Reporter reporter) throws IOException { + executeBatch(); + + try { + if (insertStatement != null) { + insertStatement.close(); + } + + if (updateStatement != null) { + updateStatement.close(); + } + + connection.commit(); + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to commit batch", 0, exception); + } finally { + try { + connection.close(); + } catch (SQLException exception) { + throw new IOException("unable to close connection", exception); + } + } + } + + private void executeBatch() throws IOException { + try { + if (insertStatementsCurrent != 0) { + LOG.info("executing insert batch " + + createBatchMessage(insertStatementsCurrent)); + + insertStatement.executeBatch(); + } + + insertStatementsCurrent = 0; + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to execute insert batch", + insertStatementsCurrent, exception); + } + + try { + if (updateStatementsCurrent != 0) { + LOG.info("executing update batch " + + createBatchMessage(updateStatementsCurrent)); + + int[] result = updateStatement.executeBatch(); + + int count = 0; + + for (int value : result) { + count += value; + } + + if (count != updateStatementsCurrent) { + throw new IOException( + "update did not update same number of statements executed in batch, batch: " + + updateStatementsCurrent + " updated: " + count); + } + } + + updateStatementsCurrent = 0; + } catch (SQLException exception) { + + String message = exception.getMessage(); + if (message.indexOf("Duplicate Key") >= 0) { + LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); + } else { + rollBack(); + + createThrowMessage("unable to execute update batch", + updateStatementsCurrent, exception); + } + } + } + + private void rollBack() { + try { + connection.rollback(); + } catch (SQLException sqlException) { + LOG.warn(StringUtils.stringifyException(sqlException)); + } + } + + private String createBatchMessage(long currentStatements) { + return String.format("[totstmts: %d][crntstmts: %d][batch: %d]", + statementsAdded, currentStatements, statementsBeforeExecute); + } + + private void createThrowMessage(String stateMessage, + long currentStatements, SQLException exception) throws IOException { + String message = exception.getMessage(); + + // message = message.substring(0, Math.min(75, message.length())); + + int messageLength = exception.getMessage().length(); + String batchMessage = createBatchMessage(currentStatements); + String template = "%s [msglength: %d]%s %s"; + String errorMessage = String.format(template, stateMessage, + messageLength, batchMessage, message); + + LOG.error(errorMessage, exception.getNextException()); + + throw new IOException(errorMessage, exception.getNextException()); + } + + /** {@inheritDoc} */ + public synchronized void write(K key, V value) throws IOException { + try { + if (value == null) { + key.write(insertStatement); + insertStatement.addBatch(); + insertStatementsCurrent++; + } else { + key.write(updateStatement); + updateStatement.addBatch(); + updateStatementsCurrent++; + } + } catch (SQLException exception) { + throw new IOException("unable to add batch statement", exception); + } + + statementsAdded++; + + if (statementsAdded % statementsBeforeExecute == 0) { + executeBatch(); + } + } + } + + /** + * Constructs the query used as the prepared statement to insert data. + * + * @param table + * the table to insert into + * @param fieldNames + * the fields to insert into. If field names are unknown, supply an + * array of nulls. + */ + protected String constructInsertQuery(String table, String[] fieldNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("Field names may not be null"); + } + + StringBuilder query = new StringBuilder(); + + query.append("INSERT INTO ").append(table); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + query.append(" ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + + if (i != fieldNames.length - 1) { + query.append(","); + } + } + + query.append(")"); + + } + + query.append(" VALUES ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append("?"); + + if (i != fieldNames.length - 1) { + query.append(","); + } + } + + query.append(")"); + + boolean test = true; + if (test) { + query.append(" ON DUPLICATE KEY UPDATE "); + + for (int i = 1; i < fieldNames.length; i++) { + + if ((i != 1)) { + query.append(","); + } + // if (i != fieldNames.length - 1) { query.append(","); } + // && (i != fieldNames.length - 1) + query.append(fieldNames[i]); + query.append(" = ?"); + + } + } + + query.append(";"); + + LOG.info(" ===================== " + query.toString()); + return query.toString(); + } + + protected String constructUpdateQuery(String table, String[] fieldNames, + String[] updateNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("field names may not be null"); + } + + Set<String> updateNamesSet = new HashSet<String>(); + Collections.addAll(updateNamesSet, updateNames); + + StringBuilder query = new StringBuilder(); + + query.append("UPDATE ").append(table); + + query.append(" SET "); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + int count = 0; + + for (int i = 0; i < fieldNames.length; i++) { + if (updateNamesSet.contains(fieldNames[i])) { + continue; + } + + if (count != 0) { + query.append(","); + } + + query.append(fieldNames[i]); + query.append(" = ?"); + + count++; + } + } + + query.append(" WHERE "); + + if (updateNames.length > 0 && updateNames[0] != null) { + for (int i = 0; i < updateNames.length; i++) { + query.append(updateNames[i]); + query.append(" = ?"); + + if (i != updateNames.length - 1) { + query.append(" and "); + } + } + } + + query.append(";"); + System.out.println("Update Query => " + query.toString()); + return query.toString(); + } + + /** {@inheritDoc} */ + public void checkOutputSpecs(FileSystem filesystem, JobConf job) + throws IOException { + } + + /** {@inheritDoc} */ + public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, + JobConf job, String name, Progressable progress) throws IOException { + DBConfiguration dbConf = new DBConfiguration(job); + + String tableName = dbConf.getOutputTableName(); + String[] fieldNames = dbConf.getOutputFieldNames(); + String[] updateNames = dbConf.getOutputUpdateFieldNames(); + int batchStatements = dbConf.getBatchStatementsNum(); + + Connection connection = dbConf.getConnection(); + + configureConnection(connection); + + String sqlInsert = constructInsertQuery(tableName, fieldNames); + PreparedStatement insertPreparedStatement; + + try { + insertPreparedStatement = connection.prepareStatement(sqlInsert); + insertPreparedStatement.setEscapeProcessing(true); // should be on by + // default + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlInsert, + exception); + } + + String sqlUpdate = updateNames != null ? constructUpdateQuery(tableName, + fieldNames, updateNames) : null; + PreparedStatement updatePreparedStatement = null; + + try { + updatePreparedStatement = sqlUpdate != null ? connection + .prepareStatement(sqlUpdate) : null; + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlUpdate, + exception); + } + + return new DBRecordWriter(connection, insertPreparedStatement, + updatePreparedStatement, batchStatements); + } + + protected void configureConnection(Connection connection) { + setAutoCommit(connection); + } + + protected void setAutoCommit(Connection connection) { + try { + connection.setAutoCommit(false); + } catch (Exception exception) { + throw new RuntimeException("unable to set auto commit", exception); + } + } + + /** + * Initializes the reduce-part of the job with the appropriate output + * settings + * + * @param job + * The job + * @param dbOutputFormatClass + * @param tableName + * The table to insert data into + * @param fieldNames + * The field names in the table. If unknown, supply the appropriate + */ + public static void setOutput(JobConf job, + Class<? extends DBOutputFormat> dbOutputFormatClass, String tableName, + String[] fieldNames, String[] updateFields, int batchSize) { + if (dbOutputFormatClass == null) { + job.setOutputFormat(DBOutputFormat.class); + } else { + job.setOutputFormat(dbOutputFormatClass); + } + + // writing doesn't always happen in reduce + job.setReduceSpeculativeExecution(false); + job.setMapSpeculativeExecution(false); + + DBConfiguration dbConf = new DBConfiguration(job); + + dbConf.setOutputTableName(tableName); + dbConf.setOutputFieldNames(fieldNames); + + if (updateFields != null) { + dbConf.setOutputUpdateFieldNames(updateFields); + } + + if (batchSize != -1) { + dbConf.setBatchStatementsNum(batchSize); + } + } } diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 6216695..450a57d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -//package parallelai.spyglass.hbase -// -//import cascading.pipe.Pipe -//import cascading.pipe.assembly.Coerce -//import cascading.scheme.Scheme -//import cascading.tap.{ Tap, SinkMode } -//import cascading.tuple.Fields -//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -//import org.apache.hadoop.hbase.util.Bytes -//import scala.collection.JavaConversions._ -//import scala.collection.mutable.WrappedArray -//import com.twitter.scalding._ -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable -//import org.apache.hadoop.hbase.client.Scan -//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -//import org.apache.hadoop.hbase.util.Base64 -//import java.io.ByteArrayOutputStream -//import java.io.DataOutputStream -// -//object HBaseRawSource { -// /** -// * Converts a scan object to a base64 string that can be passed to HBaseRawSource -// * @param scan -// * @return base64 string representation -// */ -// def convertScanToString(scan: Scan) = { -// val out = new ByteArrayOutputStream(); -// val dos = new DataOutputStream(out); -// scan.write(dos); -// Base64.encodeBytes(out.toByteArray()); -// } -//} -// -// -///** -// * @author Rotem Hermon -// * -// * HBaseRawSource is a scalding source that passes the original row (Result) object to the -// * mapper for customized processing. -// * -// * @param tableName The name of the HBase table to read -// * @param quorumNames HBase quorum -// * @param familyNames Column families to get (source, if null will get all) or update to (sink) -// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written -// * @param base64Scan An optional base64 encoded scan object -// * @param sinkMode If REPLACE the output table will be deleted before writing to -// * -// */ -//class HBaseRawSource( -// tableName: String, -// quorumNames: String = "localhost", -// familyNames: Array[String], -// writeNulls: Boolean = true, -// base64Scan: String = null, -// sinkMode: SinkMode = null) extends Source { -// -// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -// -// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -// val hBaseScheme = hdfsScheme match { -// case hbase: HBaseRawScheme => hbase -// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -// } -// mode match { -// case hdfsMode @ Hdfs(_, _) => readOrWrite match { -// case Read => { -// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -// case null => SinkMode.KEEP -// case _ => sinkMode -// }).asInstanceOf[Tap[_, _, _]] -// } -// case Write => { -// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -// case null => SinkMode.UPDATE -// case _ => sinkMode -// }).asInstanceOf[Tap[_, _, _]] -// } -// } -// case _ => super.createTap(readOrWrite)(mode) -// } -// } -//} +package parallelai.spyglass.hbase + +import cascading.pipe.Pipe +import cascading.pipe.assembly.Coerce +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tuple.Fields +import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.JavaConversions._ +import scala.collection.mutable.WrappedArray +import com.twitter.scalding._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.util.Base64 +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream + +object HBaseRawSource { + /** + * Converts a scan object to a base64 string that can be passed to HBaseRawSource + * @param scan + * @return base64 string representation + */ + def convertScanToString(scan: Scan) = { + val out = new ByteArrayOutputStream(); + val dos = new DataOutputStream(out); + scan.write(dos); + Base64.encodeBytes(out.toByteArray()); + } +} + + +/** +* @author Rotem Hermon +* +* HBaseRawSource is a scalding source that passes the original row (Result) object to the +* mapper for customized processing. +* +* @param tableName The name of the HBase table to read +* @param quorumNames HBase quorum +* @param familyNames Column families to get (source, if null will get all) or update to (sink) +* @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written +* @param base64Scan An optional base64 encoded scan object +* @param sinkMode If REPLACE the output table will be deleted before writing to +* +*/ +class HBaseRawSource( + tableName: String, + quorumNames: String = "localhost", + familyNames: Array[String], + writeNulls: Boolean = true, + base64Scan: String = null, + sinkMode: SinkMode = null) extends Source { + + override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseRawScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.KEEP + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + case Write => { + new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { + case null => SinkMode.UPDATE + case _ => sinkMode + }).asInstanceOf[Tap[_, _, _]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} |