From df2fa37f337bbbb219449aadaf57bcacd2350ada Mon Sep 17 00:00:00 2001 From: Gracia Fernandez Date: Wed, 10 Jul 2013 10:26:41 +0100 Subject: Versions reverted back to the old ones: Scala 2.9.3 (cdh4.2.0) --- .../parallelai/spyglass/hbase/HBaseRawScheme.java | 572 +++++++++---------- .../parallelai/spyglass/hbase/HBaseRawTap.java | 622 ++++++++++----------- .../parallelai/spyglass/hbase/HBaseRawSource.scala | 166 +++--- 3 files changed, 680 insertions(+), 680 deletions(-) (limited to 'src') diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java index 32730d6..7dba40d 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawScheme.java @@ -1,286 +1,286 @@ -/* - * Copyright (c) 2009 Concurrent, Inc. - * - * This work has been released into the public domain - * by the copyright holder. This applies worldwide. - * - * In case this is not legally possible: - * The copyright holder grants any entity the right - * to use this work for any purpose, without any - * conditions, unless such conditions are required by law. - */ - -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; - -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; -import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; - -import cascading.flow.FlowProcess; -import cascading.scheme.Scheme; -import cascading.scheme.SinkCall; -import cascading.scheme.SourceCall; -import cascading.tap.Tap; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import cascading.util.Util; - -/** - * The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction - * with the {@HBaseRawTap} to allow for the reading and writing of data - * to and from a HBase cluster. - * - * @see HBaseRawTap - */ -@SuppressWarnings({ "rawtypes", "deprecation" }) -public class HBaseRawScheme extends Scheme { - /** - * - */ - private static final long serialVersionUID = 6248976486883281356L; - - /** Field LOG */ - private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); - - public final Fields RowKeyField = new Fields("rowkey"); - public final Fields RowField = new Fields("row"); - - /** String familyNames */ - private String[] familyNames; - - private boolean writeNulls = true; - - /** - * Constructor HBaseScheme creates a new HBaseScheme instance. - * - * @param keyFields - * of type Fields - * @param familyName - * of type String - * @param valueFields - * of type Fields - */ - public HBaseRawScheme(String familyName) { - this(new String[] { familyName }); - } - - public HBaseRawScheme(String[] familyNames) { - this.familyNames = familyNames; - setSourceFields(); - } - - public HBaseRawScheme(String familyName, boolean writeNulls) { - this(new String[] { familyName }, writeNulls); - } - - public HBaseRawScheme(String[] familyNames, boolean writeNulls) { - this.familyNames = familyNames; - this.writeNulls = writeNulls; - setSourceFields(); - } - - private void setSourceFields() { - Fields sourceFields = Fields.join(RowKeyField, RowField); - setSourceFields(sourceFields); - } - - /** - * Method getFamilyNames returns the set of familyNames of this HBaseScheme - * object. - * - * @return the familyNames (type String[]) of this HBaseScheme object. - */ - public String[] getFamilyNames() { - HashSet familyNameSet = new HashSet(); - if (familyNames != null) { - for (String familyName : familyNames) { - familyNameSet.add(familyName); - } - } - return familyNameSet.toArray(new String[0]); - } - - @Override - public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { - Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; - - sourceCall.setContext(pair); - } - - @Override - public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { - sourceCall.setContext(null); - } - - @SuppressWarnings("unchecked") - @Override - public boolean source(FlowProcess flowProcess, SourceCall sourceCall) - throws IOException { - Tuple result = new Tuple(); - - Object key = sourceCall.getContext()[0]; - Object value = sourceCall.getContext()[1]; - boolean hasNext = sourceCall.getInput().next(key, value); - if (!hasNext) { - return false; - } - - // Skip nulls - if (key == null || value == null) { - return true; - } - - ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; - Result row = (Result) value; - result.add(keyWritable); - result.add(row); - sourceCall.getIncomingEntry().setTuple(result); - return true; - } - - @SuppressWarnings("unchecked") - @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { - TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); - OutputCollector outputCollector = sinkCall.getOutput(); - Tuple key = tupleEntry.selectTuple(RowKeyField); - Object okey = key.getObject(0); - ImmutableBytesWritable keyBytes = getBytes(okey); - Put put = new Put(keyBytes.get()); - Fields outFields = tupleEntry.getFields().subtract(RowKeyField); - if (null != outFields) { - TupleEntry values = tupleEntry.selectEntry(outFields); - for (int n = 0; n < values.getFields().size(); n++) { - Object o = values.get(n); - ImmutableBytesWritable valueBytes = getBytes(o); - Comparable field = outFields.get(n); - ColumnName cn = parseColumn((String) field); - if (null == cn.family) { - if (n >= familyNames.length) - cn.family = familyNames[familyNames.length - 1]; - else - cn.family = familyNames[n]; - } - if (null != o || writeNulls) - put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); - } - } - outputCollector.collect(null, put); - } - - private ImmutableBytesWritable getBytes(Object obj) { - if (null == obj) - return new ImmutableBytesWritable(new byte[0]); - if (obj instanceof ImmutableBytesWritable) - return (ImmutableBytesWritable) obj; - else if (obj instanceof String) - return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); - else if (obj instanceof Long) - return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); - else if (obj instanceof Integer) - return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); - else if (obj instanceof Short) - return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); - else if (obj instanceof Boolean) - return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); - else if (obj instanceof Double) - return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); - else - throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" - + obj.getClass().getName()); - } - - private ColumnName parseColumn(String column) { - ColumnName ret = new ColumnName(); - int pos = column.indexOf(":"); - if (pos > 0) { - ret.name = column.substring(pos + 1); - ret.family = column.substring(0, pos); - } else { - ret.name = column; - } - return ret; - } - - private class ColumnName { - String family; - String name; - - ColumnName() { - } - } - - @Override - public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { - conf.setOutputFormat(TableOutputFormat.class); - conf.setOutputKeyClass(ImmutableBytesWritable.class); - conf.setOutputValueClass(Put.class); - } - - @Override - public void sourceConfInit(FlowProcess process, Tap tap, - JobConf conf) { - DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, - ValueCopier.class); - if (null != familyNames) { - String columns = Util.join(this.familyNames, " "); - LOG.debug("sourcing from column families: {}", columns); - conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); - } - } - - @Override - public boolean equals(Object object) { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - if (!super.equals(object)) { - return false; - } - - HBaseRawScheme that = (HBaseRawScheme) object; - - if (!Arrays.equals(familyNames, that.familyNames)) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); - return result; - } - - public static class ValueCopier implements DeprecatedInputFormatValueCopier { - - public ValueCopier() { - } - - public void copyValue(Result oldValue, Result newValue) { - if (null != oldValue && null != newValue) { - oldValue.copyFrom(newValue); - } - } - - } -} +///* +//* Copyright (c) 2009 Concurrent, Inc. +//* +//* This work has been released into the public domain +//* by the copyright holder. This applies worldwide. +//* +//* In case this is not legally possible: +//* The copyright holder grants any entity the right +//* to use this work for any purpose, without any +//* conditions, unless such conditions are required by law. +//*/ +// +//package parallelai.spyglass.hbase; +// +//import java.io.IOException; +//import java.util.Arrays; +//import java.util.HashSet; +// +//import org.apache.hadoop.hbase.client.Put; +//import org.apache.hadoop.hbase.client.Result; +//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +//import org.apache.hadoop.hbase.mapred.TableOutputFormat; +//import org.apache.hadoop.hbase.util.Bytes; +//import org.apache.hadoop.mapred.JobConf; +//import org.apache.hadoop.mapred.OutputCollector; +//import org.apache.hadoop.mapred.RecordReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier; +//import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; +// +//import cascading.flow.FlowProcess; +//import cascading.scheme.Scheme; +//import cascading.scheme.SinkCall; +//import cascading.scheme.SourceCall; +//import cascading.tap.Tap; +//import cascading.tuple.Fields; +//import cascading.tuple.Tuple; +//import cascading.tuple.TupleEntry; +//import cascading.util.Util; +// +///** +//* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction +//* with the {@HBaseRawTap} to allow for the reading and writing of data +//* to and from a HBase cluster. +//* +//* @see HBaseRawTap +//*/ +//@SuppressWarnings({ "rawtypes", "deprecation" }) +//public class HBaseRawScheme extends Scheme { +// /** +// * +// */ +// private static final long serialVersionUID = 6248976486883281356L; +// +// /** Field LOG */ +// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class); +// +// public final Fields RowKeyField = new Fields("rowkey"); +// public final Fields RowField = new Fields("row"); +// +// /** String familyNames */ +// private String[] familyNames; +// +// private boolean writeNulls = true; +// +// /** +// * Constructor HBaseScheme creates a new HBaseScheme instance. +// * +// * @param keyFields +// * of type Fields +// * @param familyName +// * of type String +// * @param valueFields +// * of type Fields +// */ +// public HBaseRawScheme(String familyName) { +// this(new String[] { familyName }); +// } +// +// public HBaseRawScheme(String[] familyNames) { +// this.familyNames = familyNames; +// setSourceFields(); +// } +// +// public HBaseRawScheme(String familyName, boolean writeNulls) { +// this(new String[] { familyName }, writeNulls); +// } +// +// public HBaseRawScheme(String[] familyNames, boolean writeNulls) { +// this.familyNames = familyNames; +// this.writeNulls = writeNulls; +// setSourceFields(); +// } +// +// private void setSourceFields() { +// Fields sourceFields = Fields.join(RowKeyField, RowField); +// setSourceFields(sourceFields); +// } +// +// /** +// * Method getFamilyNames returns the set of familyNames of this HBaseScheme +// * object. +// * +// * @return the familyNames (type String[]) of this HBaseScheme object. +// */ +// public String[] getFamilyNames() { +// HashSet familyNameSet = new HashSet(); +// if (familyNames != null) { +// for (String familyName : familyNames) { +// familyNameSet.add(familyName); +// } +// } +// return familyNameSet.toArray(new String[0]); +// } +// +// @Override +// public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { +// Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() }; +// +// sourceCall.setContext(pair); +// } +// +// @Override +// public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { +// sourceCall.setContext(null); +// } +// +// @SuppressWarnings("unchecked") +// @Override +// public boolean source(FlowProcess flowProcess, SourceCall sourceCall) +// throws IOException { +// Tuple result = new Tuple(); +// +// Object key = sourceCall.getContext()[0]; +// Object value = sourceCall.getContext()[1]; +// boolean hasNext = sourceCall.getInput().next(key, value); +// if (!hasNext) { +// return false; +// } +// +// // Skip nulls +// if (key == null || value == null) { +// return true; +// } +// +// ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +// Result row = (Result) value; +// result.add(keyWritable); +// result.add(row); +// sourceCall.getIncomingEntry().setTuple(result); +// return true; +// } +// +// @SuppressWarnings("unchecked") +// @Override +// public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { +// TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +// OutputCollector outputCollector = sinkCall.getOutput(); +// Tuple key = tupleEntry.selectTuple(RowKeyField); +// Object okey = key.getObject(0); +// ImmutableBytesWritable keyBytes = getBytes(okey); +// Put put = new Put(keyBytes.get()); +// Fields outFields = tupleEntry.getFields().subtract(RowKeyField); +// if (null != outFields) { +// TupleEntry values = tupleEntry.selectEntry(outFields); +// for (int n = 0; n < values.getFields().size(); n++) { +// Object o = values.get(n); +// ImmutableBytesWritable valueBytes = getBytes(o); +// Comparable field = outFields.get(n); +// ColumnName cn = parseColumn((String) field); +// if (null == cn.family) { +// if (n >= familyNames.length) +// cn.family = familyNames[familyNames.length - 1]; +// else +// cn.family = familyNames[n]; +// } +// if (null != o || writeNulls) +// put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get()); +// } +// } +// outputCollector.collect(null, put); +// } +// +// private ImmutableBytesWritable getBytes(Object obj) { +// if (null == obj) +// return new ImmutableBytesWritable(new byte[0]); +// if (obj instanceof ImmutableBytesWritable) +// return (ImmutableBytesWritable) obj; +// else if (obj instanceof String) +// return new ImmutableBytesWritable(Bytes.toBytes((String) obj)); +// else if (obj instanceof Long) +// return new ImmutableBytesWritable(Bytes.toBytes((Long) obj)); +// else if (obj instanceof Integer) +// return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj)); +// else if (obj instanceof Short) +// return new ImmutableBytesWritable(Bytes.toBytes((Short) obj)); +// else if (obj instanceof Boolean) +// return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj)); +// else if (obj instanceof Double) +// return new ImmutableBytesWritable(Bytes.toBytes((Double) obj)); +// else +// throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class=" +// + obj.getClass().getName()); +// } +// +// private ColumnName parseColumn(String column) { +// ColumnName ret = new ColumnName(); +// int pos = column.indexOf(":"); +// if (pos > 0) { +// ret.name = column.substring(pos + 1); +// ret.family = column.substring(0, pos); +// } else { +// ret.name = column; +// } +// return ret; +// } +// +// private class ColumnName { +// String family; +// String name; +// +// ColumnName() { +// } +// } +// +// @Override +// public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { +// conf.setOutputFormat(TableOutputFormat.class); +// conf.setOutputKeyClass(ImmutableBytesWritable.class); +// conf.setOutputValueClass(Put.class); +// } +// +// @Override +// public void sourceConfInit(FlowProcess process, Tap tap, +// JobConf conf) { +// DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf, +// ValueCopier.class); +// if (null != familyNames) { +// String columns = Util.join(this.familyNames, " "); +// LOG.debug("sourcing from column families: {}", columns); +// conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns); +// } +// } +// +// @Override +// public boolean equals(Object object) { +// if (this == object) { +// return true; +// } +// if (object == null || getClass() != object.getClass()) { +// return false; +// } +// if (!super.equals(object)) { +// return false; +// } +// +// HBaseRawScheme that = (HBaseRawScheme) object; +// +// if (!Arrays.equals(familyNames, that.familyNames)) { +// return false; +// } +// return true; +// } +// +// @Override +// public int hashCode() { +// int result = super.hashCode(); +// result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +// return result; +// } +// +// public static class ValueCopier implements DeprecatedInputFormatValueCopier { +// +// public ValueCopier() { +// } +// +// public void copyValue(Result oldValue, Result newValue) { +// if (null != oldValue && null != newValue) { +// oldValue.copyFrom(newValue); +// } +// } +// +// } +//} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java index 0421b6e..780d3fc 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java @@ -1,311 +1,311 @@ -/* - * Copyright (c) 2009 Concurrent, Inc. - * - * This work has been released into the public domain - * by the copyright holder. This applies worldwide. - * - * In case this is not legally possible: - * The copyright holder grants any entity the right - * to use this work for any purpose, without any - * conditions, unless such conditions are required by law. - */ - -package parallelai.spyglass.hbase; - -import java.io.IOException; -import java.util.UUID; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import cascading.flow.FlowProcess; -import cascading.tap.SinkMode; -import cascading.tap.Tap; -import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -import cascading.tuple.TupleEntryCollector; -import cascading.tuple.TupleEntryIterator; - -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; - -/** - * The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with - * the {@HBaseRawScheme} to allow for the reading and writing - * of data to and from a HBase cluster. - */ -@SuppressWarnings({ "deprecation", "rawtypes" }) -public class HBaseRawTap extends Tap { - /** - * - */ - private static final long serialVersionUID = 8019189493428493323L; - - /** Field LOG */ - private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); - - private final String id = UUID.randomUUID().toString(); - - /** Field SCHEME */ - public static final String SCHEME = "hbase"; - - /** Field hBaseAdmin */ - private transient HBaseAdmin hBaseAdmin; - - /** Field hostName */ - private String quorumNames; - /** Field tableName */ - private String tableName; - private String base64Scan; - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - */ - public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { - super(HBaseFullScheme, SinkMode.UPDATE); - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - * @param sinkMode - * of type SinkMode - */ - public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { - super(HBaseFullScheme, sinkMode); - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - */ - public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { - super(HBaseFullScheme, SinkMode.UPDATE); - this.quorumNames = quorumNames; - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName - * of type String - * @param HBaseFullScheme - * of type HBaseFullScheme - * @param sinkMode - * of type SinkMode - */ - public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { - super(HBaseFullScheme, sinkMode); - this.quorumNames = quorumNames; - this.tableName = tableName; - } - - /** - * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param quorumNames HBase quorum - * @param tableName The name of the HBase table to read - * @param HBaseFullScheme - * @param base64Scan An optional base64 encoded scan object - * @param sinkMode If REPLACE the output table will be deleted before writing to - */ - public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { - super(HBaseFullScheme, sinkMode); - this.quorumNames = quorumNames; - this.tableName = tableName; - this.base64Scan = base64Scan; - } - - /** - * Method getTableName returns the tableName of this HBaseTap object. - * - * @return the tableName (type String) of this HBaseTap object. - */ - public String getTableName() { - return tableName; - } - - public Path getPath() { - return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); - } - - protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { - if (hBaseAdmin == null) { - Configuration hbaseConf = HBaseConfiguration.create(conf); - hBaseAdmin = new HBaseAdmin(hbaseConf); - } - - return hBaseAdmin; - } - - @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { - if (quorumNames != null) { - conf.set("hbase.zookeeper.quorum", quorumNames); - } - - LOG.debug("sinking to table: {}", tableName); - - if (isReplace() && conf.get("mapred.task.partition") == null) { - try { - deleteResource(conf); - - } catch (IOException e) { - throw new RuntimeException("could not delete resource: " + e); - } - } - - else if (isUpdate() || isReplace()) { - try { - createResource(conf); - } catch (IOException e) { - throw new RuntimeException(tableName + " does not exist !", e); - } - - } - - conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); - super.sinkConfInit(process, conf); - } - - @Override - public String getIdentifier() { - return id; - } - - @Override - public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) - throws IOException { - return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); - } - - @Override - public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) - throws IOException { - HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); - hBaseCollector.prepare(); - return hBaseCollector; - } - - @Override - public boolean createResource(JobConf jobConf) throws IOException { - HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); - - if (hBaseAdmin.tableExists(tableName)) { - return true; - } - - LOG.info("creating hbase table: {}", tableName); - - HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - - String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); - - for (String familyName : familyNames) { - tableDescriptor.addFamily(new HColumnDescriptor(familyName)); - } - - hBaseAdmin.createTable(tableDescriptor); - - return true; - } - - @Override - public boolean deleteResource(JobConf jobConf) throws IOException { - if (getHBaseAdmin(jobConf).tableExists(tableName)) { - if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) - getHBaseAdmin(jobConf).disableTable(tableName); - getHBaseAdmin(jobConf).deleteTable(tableName); - } - return true; - } - - @Override - public boolean resourceExists(JobConf jobConf) throws IOException { - return getHBaseAdmin(jobConf).tableExists(tableName); - } - - @Override - public long getModifiedTime(JobConf jobConf) throws IOException { - return System.currentTimeMillis(); // currently unable to find last mod - // time - // on a table - } - - @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { - // a hack for MultiInputFormat to see that there is a child format - FileInputFormat.setInputPaths(conf, getPath()); - - if (quorumNames != null) { - conf.set("hbase.zookeeper.quorum", quorumNames); - } - - LOG.debug("sourcing from table: {}", tableName); - conf.set(TableInputFormat.INPUT_TABLE, tableName); - if (null != base64Scan) - conf.set(TableInputFormat.SCAN, base64Scan); - - super.sourceConfInit(process, conf); - } - - @Override - public boolean equals(Object object) { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - if (!super.equals(object)) { - return false; - } - - HBaseRawTap hBaseTap = (HBaseRawTap) object; - - if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { - return false; - } - - if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); - return result; - } -} +///* +//* Copyright (c) 2009 Concurrent, Inc. +//* +//* This work has been released into the public domain +//* by the copyright holder. This applies worldwide. +//* +//* In case this is not legally possible: +//* The copyright holder grants any entity the right +//* to use this work for any purpose, without any +//* conditions, unless such conditions are required by law. +//*/ +// +//package parallelai.spyglass.hbase; +// +//import java.io.IOException; +//import java.util.UUID; +// +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.fs.Path; +//import org.apache.hadoop.hbase.HBaseConfiguration; +//import org.apache.hadoop.hbase.HColumnDescriptor; +//import org.apache.hadoop.hbase.HTableDescriptor; +//import org.apache.hadoop.hbase.MasterNotRunningException; +//import org.apache.hadoop.hbase.ZooKeeperConnectionException; +//import org.apache.hadoop.hbase.client.HBaseAdmin; +//import org.apache.hadoop.hbase.client.Scan; +//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +//import org.apache.hadoop.mapred.FileInputFormat; +//import org.apache.hadoop.mapred.JobConf; +//import org.apache.hadoop.mapred.OutputCollector; +//import org.apache.hadoop.mapred.RecordReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import cascading.flow.FlowProcess; +//import cascading.tap.SinkMode; +//import cascading.tap.Tap; +//import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +//import cascading.tuple.TupleEntryCollector; +//import cascading.tuple.TupleEntryIterator; +// +//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +// +///** +//* The HBaseRawTap class is a {@link Tap} subclass. It is used in conjunction with +//* the {@HBaseRawScheme} to allow for the reading and writing +//* of data to and from a HBase cluster. +//*/ +//@SuppressWarnings({ "deprecation", "rawtypes" }) +//public class HBaseRawTap extends Tap { +// /** +// * +// */ +// private static final long serialVersionUID = 8019189493428493323L; +// +// /** Field LOG */ +// private static final Logger LOG = LoggerFactory.getLogger(HBaseRawTap.class); +// +// private final String id = UUID.randomUUID().toString(); +// +// /** Field SCHEME */ +// public static final String SCHEME = "hbase"; +// +// /** Field hBaseAdmin */ +// private transient HBaseAdmin hBaseAdmin; +// +// /** Field hostName */ +// private String quorumNames; +// /** Field tableName */ +// private String tableName; +// private String base64Scan; +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// */ +// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme) { +// super(HBaseFullScheme, SinkMode.UPDATE); +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// * @param sinkMode +// * of type SinkMode +// */ +// public HBaseRawTap(String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +// super(HBaseFullScheme, sinkMode); +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// */ +// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme) { +// super(HBaseFullScheme, SinkMode.UPDATE); +// this.quorumNames = quorumNames; +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param tableName +// * of type String +// * @param HBaseFullScheme +// * of type HBaseFullScheme +// * @param sinkMode +// * of type SinkMode +// */ +// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, SinkMode sinkMode) { +// super(HBaseFullScheme, sinkMode); +// this.quorumNames = quorumNames; +// this.tableName = tableName; +// } +// +// /** +// * Constructor HBaseTap creates a new HBaseTap instance. +// * +// * @param quorumNames HBase quorum +// * @param tableName The name of the HBase table to read +// * @param HBaseFullScheme +// * @param base64Scan An optional base64 encoded scan object +// * @param sinkMode If REPLACE the output table will be deleted before writing to +// */ +// public HBaseRawTap(String quorumNames, String tableName, HBaseRawScheme HBaseFullScheme, String base64Scan, SinkMode sinkMode) { +// super(HBaseFullScheme, sinkMode); +// this.quorumNames = quorumNames; +// this.tableName = tableName; +// this.base64Scan = base64Scan; +// } +// +// /** +// * Method getTableName returns the tableName of this HBaseTap object. +// * +// * @return the tableName (type String) of this HBaseTap object. +// */ +// public String getTableName() { +// return tableName; +// } +// +// public Path getPath() { +// return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +// } +// +// protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +// if (hBaseAdmin == null) { +// Configuration hbaseConf = HBaseConfiguration.create(conf); +// hBaseAdmin = new HBaseAdmin(hbaseConf); +// } +// +// return hBaseAdmin; +// } +// +// @Override +// public void sinkConfInit(FlowProcess process, JobConf conf) { +// if (quorumNames != null) { +// conf.set("hbase.zookeeper.quorum", quorumNames); +// } +// +// LOG.debug("sinking to table: {}", tableName); +// +// if (isReplace() && conf.get("mapred.task.partition") == null) { +// try { +// deleteResource(conf); +// +// } catch (IOException e) { +// throw new RuntimeException("could not delete resource: " + e); +// } +// } +// +// else if (isUpdate() || isReplace()) { +// try { +// createResource(conf); +// } catch (IOException e) { +// throw new RuntimeException(tableName + " does not exist !", e); +// } +// +// } +// +// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +// super.sinkConfInit(process, conf); +// } +// +// @Override +// public String getIdentifier() { +// return id; +// } +// +// @Override +// public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) +// throws IOException { +// return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +// } +// +// @Override +// public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) +// throws IOException { +// HBaseTapCollector hBaseCollector = new HBaseTapCollector(jobConfFlowProcess, this); +// hBaseCollector.prepare(); +// return hBaseCollector; +// } +// +// @Override +// public boolean createResource(JobConf jobConf) throws IOException { +// HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); +// +// if (hBaseAdmin.tableExists(tableName)) { +// return true; +// } +// +// LOG.info("creating hbase table: {}", tableName); +// +// HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); +// +// String[] familyNames = ((HBaseRawScheme) getScheme()).getFamilyNames(); +// +// for (String familyName : familyNames) { +// tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +// } +// +// hBaseAdmin.createTable(tableDescriptor); +// +// return true; +// } +// +// @Override +// public boolean deleteResource(JobConf jobConf) throws IOException { +// if (getHBaseAdmin(jobConf).tableExists(tableName)) { +// if (getHBaseAdmin(jobConf).isTableEnabled(tableName)) +// getHBaseAdmin(jobConf).disableTable(tableName); +// getHBaseAdmin(jobConf).deleteTable(tableName); +// } +// return true; +// } +// +// @Override +// public boolean resourceExists(JobConf jobConf) throws IOException { +// return getHBaseAdmin(jobConf).tableExists(tableName); +// } +// +// @Override +// public long getModifiedTime(JobConf jobConf) throws IOException { +// return System.currentTimeMillis(); // currently unable to find last mod +// // time +// // on a table +// } +// +// @Override +// public void sourceConfInit(FlowProcess process, JobConf conf) { +// // a hack for MultiInputFormat to see that there is a child format +// FileInputFormat.setInputPaths(conf, getPath()); +// +// if (quorumNames != null) { +// conf.set("hbase.zookeeper.quorum", quorumNames); +// } +// +// LOG.debug("sourcing from table: {}", tableName); +// conf.set(TableInputFormat.INPUT_TABLE, tableName); +// if (null != base64Scan) +// conf.set(TableInputFormat.SCAN, base64Scan); +// +// super.sourceConfInit(process, conf); +// } +// +// @Override +// public boolean equals(Object object) { +// if (this == object) { +// return true; +// } +// if (object == null || getClass() != object.getClass()) { +// return false; +// } +// if (!super.equals(object)) { +// return false; +// } +// +// HBaseRawTap hBaseTap = (HBaseRawTap) object; +// +// if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +// return false; +// } +// +// if (base64Scan != null ? !base64Scan.equals(hBaseTap.base64Scan) : hBaseTap.base64Scan != null) { +// return false; +// } +// +// return true; +// } +// +// @Override +// public int hashCode() { +// int result = super.hashCode(); +// result = 31 * result + (tableName != null ? tableName.hashCode() : 0) + (base64Scan != null ? base64Scan.hashCode() : 0); +// return result; +// } +//} diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index bbc205b..6216695 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -1,83 +1,83 @@ -package parallelai.spyglass.hbase - -import cascading.pipe.Pipe -import cascading.pipe.assembly.Coerce -import cascading.scheme.Scheme -import cascading.tap.{ Tap, SinkMode } -import cascading.tuple.Fields -import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -import org.apache.hadoop.hbase.util.Bytes -import scala.collection.JavaConversions._ -import scala.collection.mutable.WrappedArray -import com.twitter.scalding._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -import org.apache.hadoop.hbase.util.Base64 -import java.io.ByteArrayOutputStream -import java.io.DataOutputStream - -object HBaseRawSource { - /** - * Converts a scan object to a base64 string that can be passed to HBaseRawSource - * @param scan - * @return base64 string representation - */ - def convertScanToString(scan: Scan) = { - val out = new ByteArrayOutputStream(); - val dos = new DataOutputStream(out); - scan.write(dos); - Base64.encodeBytes(out.toByteArray()); - } -} - - -/** - * @author Rotem Hermon - * - * HBaseRawSource is a scalding source that passes the original row (Result) object to the - * mapper for customized processing. - * - * @param tableName The name of the HBase table to read - * @param quorumNames HBase quorum - * @param familyNames Column families to get (source, if null will get all) or update to (sink) - * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written - * @param base64Scan An optional base64 encoded scan object - * @param sinkMode If REPLACE the output table will be deleted before writing to - * - */ -class HBaseRawSource( - tableName: String, - quorumNames: String = "localhost", - familyNames: Array[String], - writeNulls: Boolean = true, - base64Scan: String = null, - sinkMode: SinkMode = null) extends Source { - - override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] - - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - val hBaseScheme = hdfsScheme match { - case hbase: HBaseRawScheme => hbase - case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") - } - mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { - case Read => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.KEEP - case _ => sinkMode - }).asInstanceOf[Tap[_, _, _]] - } - case Write => { - new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { - case null => SinkMode.UPDATE - case _ => sinkMode - }).asInstanceOf[Tap[_, _, _]] - } - } - case _ => super.createTap(readOrWrite)(mode) - } - } -} +//package parallelai.spyglass.hbase +// +//import cascading.pipe.Pipe +//import cascading.pipe.assembly.Coerce +//import cascading.scheme.Scheme +//import cascading.tap.{ Tap, SinkMode } +//import cascading.tuple.Fields +//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } +//import org.apache.hadoop.hbase.util.Bytes +//import scala.collection.JavaConversions._ +//import scala.collection.mutable.WrappedArray +//import com.twitter.scalding._ +//import org.apache.hadoop.hbase.io.ImmutableBytesWritable +//import org.apache.hadoop.hbase.client.Scan +//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +//import org.apache.hadoop.hbase.util.Base64 +//import java.io.ByteArrayOutputStream +//import java.io.DataOutputStream +// +//object HBaseRawSource { +// /** +// * Converts a scan object to a base64 string that can be passed to HBaseRawSource +// * @param scan +// * @return base64 string representation +// */ +// def convertScanToString(scan: Scan) = { +// val out = new ByteArrayOutputStream(); +// val dos = new DataOutputStream(out); +// scan.write(dos); +// Base64.encodeBytes(out.toByteArray()); +// } +//} +// +// +///** +// * @author Rotem Hermon +// * +// * HBaseRawSource is a scalding source that passes the original row (Result) object to the +// * mapper for customized processing. +// * +// * @param tableName The name of the HBase table to read +// * @param quorumNames HBase quorum +// * @param familyNames Column families to get (source, if null will get all) or update to (sink) +// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written +// * @param base64Scan An optional base64 encoded scan object +// * @param sinkMode If REPLACE the output table will be deleted before writing to +// * +// */ +//class HBaseRawSource( +// tableName: String, +// quorumNames: String = "localhost", +// familyNames: Array[String], +// writeNulls: Boolean = true, +// base64Scan: String = null, +// sinkMode: SinkMode = null) extends Source { +// +// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +// +// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +// val hBaseScheme = hdfsScheme match { +// case hbase: HBaseRawScheme => hbase +// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +// } +// mode match { +// case hdfsMode @ Hdfs(_, _) => readOrWrite match { +// case Read => { +// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +// case null => SinkMode.KEEP +// case _ => sinkMode +// }).asInstanceOf[Tap[_, _, _]] +// } +// case Write => { +// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +// case null => SinkMode.UPDATE +// case _ => sinkMode +// }).asInstanceOf[Tap[_, _, _]] +// } +// } +// case _ => super.createTap(readOrWrite)(mode) +// } +// } +//} -- cgit v1.2.3