diff options
author | Saad Rashid <saad373@gmail.com> | 2013-06-24 17:13:19 +0100 |
---|---|---|
committer | Saad Rashid <saad373@gmail.com> | 2013-06-24 17:13:19 +0100 |
commit | 3e4a74de169104679f2459947c27d5cfb8cc430b (patch) | |
tree | 28e5ba58790e6964f5228382edacc6d459976926 /src/main/java/parallelai | |
parent | 056fe03a21f48d57f31a0c11f159874b410bc4e9 (diff) | |
download | SpyGlass-3e4a74de169104679f2459947c27d5cfb8cc430b.tar.gz SpyGlass-3e4a74de169104679f2459947c27d5cfb8cc430b.zip |
Added JDBCTap support.
Diffstat (limited to 'src/main/java/parallelai')
10 files changed, 2880 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java new file mode 100644 index 0000000..6788624 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java @@ -0,0 +1,25 @@ +package parallelai.spyglass.jdbc; + +import org.apache.hadoop.conf.Configuration; + +public class JDBCConstants { + + public enum JdbcSourceMode { + SELECT, + SELECT_WITH_PARTITIONS, + SELECT_WITH_BUCKETS; + } + + public enum JdbcSinkMode { + INSERT, + UPDATE, + UPSERT; + } + + public static final String START_KEY = "jdbc.%s.startkey"; + public static final String STOP_KEY = "jdbc.%s.stopkey"; + public static final String SOURCE_MODE = "jdbc.%s.source.mode"; + public static final String KEY_LIST = "jdbc.%s.key.list"; + public static final String VERSIONS = "jdbc.%s.versions"; + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java new file mode 100644 index 0000000..5007895 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java @@ -0,0 +1,670 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import parallelai.spyglass.jdbc.db.DBInputFormat; +import parallelai.spyglass.jdbc.db.DBOutputFormat; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database. + * <p/> + * If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns + * for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated. + * <p/> + * Some constructors take columnFields and updateByFields. These values will be used during field name resolution + * to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names + * in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects, + * use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size. + * <p/> + * Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database. + */ +public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> +{ + private Class<? extends DBInputFormat> inputFormatClass; + private Class<? extends DBOutputFormat> outputFormatClass; + private String[] columns; + private String[] orderBy; + private String conditions; + private String[] updateBy; + private Fields updateValueFields; + private Fields updateByFields; + private Fields columnFields; + private Tuple updateIfTuple; + private String selectQuery; + private String countQuery; + private long limit = -1; + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy ) + { + this.columnFields = columnFields; + + verifyColumns( columnFields, columns ); + + setSinkFields( columnFields ); + setSourceFields( columnFields ); + + if( updateBy != null && updateBy.length != 0 ) + { + this.updateBy = updateBy; + this.updateByFields = updateByFields; + + if( updateByFields.size() != updateBy.length ) + throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" ); + + if( !this.columnFields.contains( this.updateByFields ) ) + throw new IllegalArgumentException( "columnFields must contain updateByFields column names" ); + + this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields ); + this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls + } + + this.columns = columns; + this.orderBy = orderBy; + this.conditions = conditions; + this.limit = limit; + + this.inputFormatClass = inputFormatClass; + this.outputFormatClass = outputFormatClass; + } + + private void verifyColumns( Fields columnFields, String[] columns ) + { + if( columnFields.size() != columns.length ) + throw new IllegalArgumentException( "columnFields and columns must be the same size" ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateBy of type String[] + */ + public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy ) + { + this( null, null, columns, orderBy, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) + { + this( null, null, columnFields, columns, orderBy, updateByFields, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit ) + { + this( null, null, columns, orderBy, conditions, limit, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit ) + { + this( null, null, columnFields, columns, orderBy, conditions, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + */ + public JDBCScheme( String[] columns, String[] orderBy, String conditions ) + { + this( null, null, columns, orderBy, conditions, null ); + } + + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions ) + { + this( null, null, columnFields, columns, orderBy, conditions, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param limit of type long + */ + public JDBCScheme( String[] columns, String[] orderBy, long limit ) + { + this( null, null, columns, orderBy, null, limit, null ); + } + + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit ) + { + this( null, null, columnFields, columns, orderBy, null, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + */ + public JDBCScheme( String[] columns, String[] orderBy ) + { + this( null, null, columns, orderBy, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + */ + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy ) + { + this( null, null, columnFields, columns, orderBy, null, -1, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( String[] columns, String conditions, long limit ) + { + this( null, null, columns, null, conditions, limit, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit ) + { + this( null, null, columnFields, columns, null, conditions, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param conditions of type String + */ + public JDBCScheme( String[] columns, String conditions ) + { + this( null, null, columns, null, conditions, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param conditions of type String + */ + public JDBCScheme( Fields columnFields, String[] columns, String conditions ) + { + this( null, null, columnFields, columns, null, conditions, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param limit of type long + */ + public JDBCScheme( String[] columns, long limit ) + { + this( null, null, columns, null, null, limit, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, long limit ) + { + this( null, null, columnFields, columns, null, null, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + */ + public JDBCScheme( String[] columns ) + { + this( null, null, new Fields( columns ), columns, null, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + */ + public JDBCScheme( Fields columnFields, String[] columns ) + { + this( null, null, columnFields, columns, null, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * <p/> + * Use this constructor if the data source may only be used as a source. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit ) + { + this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) + { + this.columnFields = columnFields; + + verifyColumns( columnFields, columns ); + + setSourceFields( columnFields ); + + this.columns = columns; + this.selectQuery = selectQuery.trim().replaceAll( ";$", "" ); + this.countQuery = countQuery.trim().replaceAll( ";$", "" ); + this.limit = limit; + + this.inputFormatClass = inputFormatClass; + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * <p/> + * Use this constructor if the data source may only be used as a source. + * + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit ) + { + this( null, new Fields( columns ), columns, selectQuery, countQuery, limit ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) + { + this( null, columnFields, columns, selectQuery, countQuery, limit ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * <p/> + * Use this constructor if the data source may only be used as a source. + * + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + */ + public JDBCScheme( String[] columns, String selectQuery, String countQuery ) + { + this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + */ + public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery ) + { + this( null, columnFields, columns, selectQuery, countQuery, -1 ); + } + + /** + * Method getColumns returns the columns of this JDBCScheme object. + * + * @return the columns (type String[]) of this JDBCScheme object. + */ + public String[] getColumns() { + return columns; + } + + /** + * Method getOrderBy returns the orderBy of this JDBCScheme object. + * + * @return the orderBy (type String[]) of this JDBCScheme object. + */ + public String[] getOrderBy() { + return orderBy; + } + + @Override + public void sourceConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, + JobConf conf ) { + int concurrentReads = ( (JDBCTap) tap ).concurrentReads; + + if( selectQuery != null ) + DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads ); + else { + String tableName = ( (JDBCTap) tap ).getTableName(); + String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null; + DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns ); + } + + if( inputFormatClass != null ) + conf.setInputFormat( inputFormatClass ); + } + + @Override + public void sinkConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, + JobConf conf ) { + if( selectQuery != null ) + throw new TapException( "cannot sink to this Scheme" ); + + String tableName = ( (JDBCTap) tap ).getTableName(); + int batchSize = ( (JDBCTap) tap ).getBatchSize(); + DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize ); + + if( outputFormatClass != null ) + conf.setOutputFormat( outputFormatClass ); + } + + @Override + public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) + { + Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + + sourceCall.setContext( pair ); + } + + @Override + public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException + { + Object key = sourceCall.getContext()[ 0 ]; + Object value = sourceCall.getContext()[ 1 ]; + boolean result = sourceCall.getInput().next( key, value ); + + if( !result ) + return false; + + Tuple newTuple = ( (TupleRecord) value ).getTuple(); + sourceCall.getIncomingEntry().setTuple( newTuple ); + + return true; + } + + @Override + public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) { + sourceCall.setContext( null ); + } + + @Override + public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException { + // it's ok to use NULL here so the collector does not write anything + TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); + OutputCollector outputCollector = sinkCall.getOutput(); + if( updateBy != null ) + { + Tuple allValues = tupleEntry.selectTuple( updateValueFields ); + Tuple updateValues = tupleEntry.selectTuple( updateByFields ); + + allValues = cleanTuple( allValues ); + + TupleRecord key = new TupleRecord( allValues ); + + if( updateValues.equals( updateIfTuple ) ) + outputCollector.collect( key, null ); + else + outputCollector.collect( key, key ); + + return; + } + + Tuple result = tupleEntry.selectTuple( getSinkFields() ); + + result = cleanTuple( result ); + + outputCollector.collect( new TupleRecord( result ), null ); + } + + /** + * Provides a hook for subclasses to escape or modify any values before creating the final SQL statement. + * + * @param result + * @return + */ + protected Tuple cleanTuple( Tuple result ) { + return result; + } + + @Override + public boolean equals( Object object ) { + if( this == object ) + return true; + if( !( object instanceof JDBCScheme ) ) + return false; + if( !super.equals( object ) ) + return false; + + JDBCScheme that = (JDBCScheme) object; + + if( limit != that.limit ) + return false; + if( columnFields != null ? !columnFields.equals( that.columnFields ) : that.columnFields != null ) + return false; + if( !Arrays.equals( columns, that.columns ) ) + return false; + if( conditions != null ? !conditions.equals( that.conditions ) : that.conditions != null ) + return false; + if( countQuery != null ? !countQuery.equals( that.countQuery ) : that.countQuery != null ) + return false; + if( inputFormatClass != null ? !inputFormatClass.equals( that.inputFormatClass ) : that.inputFormatClass != null ) + return false; + if( !Arrays.equals( orderBy, that.orderBy ) ) + return false; + if( outputFormatClass != null ? !outputFormatClass.equals( that.outputFormatClass ) : that.outputFormatClass != null ) + return false; + if( selectQuery != null ? !selectQuery.equals( that.selectQuery ) : that.selectQuery != null ) + return false; + if( !Arrays.equals( updateBy, that.updateBy ) ) + return false; + if( updateByFields != null ? !updateByFields.equals( that.updateByFields ) : that.updateByFields != null ) + return false; + if( updateIfTuple != null ? !updateIfTuple.equals( that.updateIfTuple ) : that.updateIfTuple != null ) + return false; + if( updateValueFields != null ? !updateValueFields.equals( that.updateValueFields ) : that.updateValueFields != null ) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + ( inputFormatClass != null ? inputFormatClass.hashCode() : 0 ); + result = 31 * result + ( outputFormatClass != null ? outputFormatClass.hashCode() : 0 ); + result = 31 * result + ( columns != null ? Arrays.hashCode( columns ) : 0 ); + result = 31 * result + ( orderBy != null ? Arrays.hashCode( orderBy ) : 0 ); + result = 31 * result + ( conditions != null ? conditions.hashCode() : 0 ); + result = 31 * result + ( updateBy != null ? Arrays.hashCode( updateBy ) : 0 ); + result = 31 * result + ( updateValueFields != null ? updateValueFields.hashCode() : 0 ); + result = 31 * result + ( updateByFields != null ? updateByFields.hashCode() : 0 ); + result = 31 * result + ( columnFields != null ? columnFields.hashCode() : 0 ); + result = 31 * result + ( updateIfTuple != null ? updateIfTuple.hashCode() : 0 ); + result = 31 * result + ( selectQuery != null ? selectQuery.hashCode() : 0 ); + result = 31 * result + ( countQuery != null ? countQuery.hashCode() : 0 ); + result = 31 * result + (int) ( limit ^ ( limit >>> 32 ) ); + return result; + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java new file mode 100644 index 0000000..1c10a05 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java @@ -0,0 +1,621 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.util.HadoopUtil; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parallelai.spyglass.jdbc.db.DBConfiguration; + +import java.io.IOException; +import java.sql.*; +import java.util.*; + +/** + * Class JDBCTap is a {@link Tap} sub-class that provides read and write access to a RDBMS via JDBC drivers. + * <p/> + * This Tap fully supports TABLE DROP and CREATE when given a {@link TableDesc} instance. + * <p/> + * When using {@link SinkMode#UPDATE}, Cascading is instructed to not delete the resource (drop the Table) + * and assumes its safe to begin sinking data into it. The {@link JDBCScheme} is responsible for + * deciding if/when to perform an UPDATE instead of an INSERT. + * <p/> + * Both INSERT and UPDATE are supported through the JDBCScheme. + * <p/> + * By sub-classing JDBCScheme, {@link com.twitter.maple.jdbc.db.DBInputFormat}, and {@link com.twitter.maple.jdbc.db.DBOutputFormat}, + * specific vendor features can be supported. + * <p/> + * Use {@link #setBatchSize(int)} to set the number of INSERT/UPDATES should be grouped together before being + * executed. The default vaue is 1,000. + * <p/> + * Use {@link #executeQuery(String, int)} or {@link #executeUpdate(String)} to invoke SQL statements against + * the underlying Table. + * <p/> + * Note that all classes under the {@link com.twitter.maple.jdbc.db} package originated from the Hadoop project and + * retain their Apache 2.0 license though they have been heavily modified to support INSERT/UPDATE and + * vendor specialization, and a number of other features like 'limit'. + * + * @see JDBCScheme + * @see com.twitter.maple.jdbc.db.DBInputFormat + * @see com.twitter.maple.jdbc.db.DBOutputFormat + */ +public class JDBCTap extends Tap<JobConf, RecordReader, OutputCollector> { + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(JDBCTap.class); + + private final String id = UUID.randomUUID().toString(); + + /** Field connectionUrl */ + String connectionUrl; + /** Field username */ + String username; + /** Field password */ + String password; + /** Field driverClassName */ + String driverClassName; + /** Field tableDesc */ + TableDesc tableDesc; + /** Field batchSize */ + int batchSize = 1000; + /** Field concurrentReads */ + int concurrentReads = 0; + + /** + * Constructor JDBCTap creates a new JDBCTap instance. + * <p/> + * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated + * into. By default it uses {@link SinkMode#UPDATE}. + * + * @param connectionUrl of type String + * @param username of type String + * @param password of type String + * @param driverClassName of type String + * @param tableName of type String + * @param scheme of type JDBCScheme + */ + public JDBCTap( String connectionUrl, String username, String password, String driverClassName, String tableName, JDBCScheme scheme ) { + this( connectionUrl, username, password, driverClassName, new TableDesc( tableName ), scheme, SinkMode.UPDATE ); + } + + /** + * Constructor JDBCTap creates a new JDBCTap instance. + * + * @param connectionUrl of type String + * @param driverClassName of type String + * @param tableDesc of type TableDesc + * @param scheme of type JDBCScheme + * @param sinkMode of type SinkMode + */ + public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) { + this( connectionUrl, null, null, driverClassName, tableDesc, scheme, sinkMode ); + } + + /** + * Constructor JDBCTap creates a new JDBCTap instance. + * <p/> + * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated + * into. By default it uses {@link SinkMode#UPDATE}. + * + * @param connectionUrl of type String + * @param username of type String + * @param password of type String + * @param driverClassName of type String + * @param tableDesc of type TableDesc + * @param scheme of type JDBCScheme + */ + public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) { + this( connectionUrl, username, password, driverClassName, tableDesc, scheme, SinkMode.UPDATE ); + } + + /** + * Constructor JDBCTap creates a new JDBCTap instance. + * + * @param connectionUrl of type String + * @param username of type String + * @param password of type String + * @param driverClassName of type String + * @param tableDesc of type TableDesc + * @param scheme of type JDBCScheme + * @param sinkMode of type SinkMode + */ + public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) { + super( scheme, sinkMode ); + this.connectionUrl = connectionUrl; + this.username = username; + this.password = password; + this.driverClassName = driverClassName; + this.tableDesc = tableDesc; + + if( tableDesc.getColumnDefs() == null && sinkMode != SinkMode.UPDATE ) + throw new IllegalArgumentException( "cannot have sink mode REPLACE or KEEP without TableDesc column defs, use UPDATE mode" ); + + if( sinkMode != SinkMode.UPDATE ) + LOG.warn( "using sink mode: {}, consider UPDATE to prevent DROP TABLE from being called during Flow or Cascade setup", sinkMode ); + } + + /** + * Constructor JDBCTap creates a new JDBCTap instance. + * <p/> + * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated + * into. By default it uses {@link SinkMode#UPDATE}. + * + * @param connectionUrl of type String + * @param driverClassName of type String + * @param tableDesc of type TableDesc + * @param scheme of type JDBCScheme + */ + public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) { + this( connectionUrl, driverClassName, tableDesc, scheme, SinkMode.UPDATE ); + } + + /** + * Constructor JDBCTap creates a new JDBCTap instance that may only used as a data source. + * + * @param connectionUrl of type String + * @param username of type String + * @param password of type String + * @param driverClassName of type String + * @param scheme of type JDBCScheme + */ + public JDBCTap( String connectionUrl, String username, String password, String driverClassName, JDBCScheme scheme ) { + super( scheme ); + this.connectionUrl = connectionUrl; + this.username = username; + this.password = password; + this.driverClassName = driverClassName; + } + + /** + * Constructor JDBCTap creates a new JDBCTap instance. + * + * @param connectionUrl of type String + * @param driverClassName of type String + * @param scheme of type JDBCScheme + */ + public JDBCTap( String connectionUrl, String driverClassName, JDBCScheme scheme ) { + this( connectionUrl, null, null, driverClassName, scheme ); + } + + /** + * Method getTableName returns the tableName of this JDBCTap object. + * + * @return the tableName (type String) of this JDBCTap object. + */ + public String getTableName() { + return tableDesc.tableName; + } + + /** + * Method setBatchSize sets the batchSize of this JDBCTap object. + * + * @param batchSize the batchSize of this JDBCTap object. + */ + public void setBatchSize( int batchSize ) { + this.batchSize = batchSize; + } + + /** + * Method getBatchSize returns the batchSize of this JDBCTap object. + * + * @return the batchSize (type int) of this JDBCTap object. + */ + public int getBatchSize() { + return batchSize; + } + + /** + * Method getConcurrentReads returns the concurrentReads of this JDBCTap object. + * <p/> + * This value specifies the number of concurrent selects and thus the number of mappers + * that may be used. A value of -1 uses the job default. + * + * @return the concurrentReads (type int) of this JDBCTap object. + */ + public int getConcurrentReads() { + return concurrentReads; + } + + /** + * Method setConcurrentReads sets the concurrentReads of this JDBCTap object. + * <p/> + * This value specifies the number of concurrent selects and thus the number of mappers + * that may be used. A value of -1 uses the job default. + * + * @param concurrentReads the concurrentReads of this JDBCTap object. + */ + public void setConcurrentReads( int concurrentReads ) { + this.concurrentReads = concurrentReads; + } + + /** + * Method getPath returns the path of this JDBCTap object. + * + * @return the path (type Path) of this JDBCTap object. + */ + public Path getPath() { + return new Path( getJDBCPath() ); + } + + @Override + public String getIdentifier() { + return getJDBCPath() + this.id; + } + + + public String getJDBCPath() { + return "jdbc:/" + connectionUrl.replaceAll( ":", "_" ); + } + + public boolean isWriteDirect() { + return true; + } + + private JobConf getSourceConf( FlowProcess<JobConf> flowProcess, JobConf conf, String property ) + throws IOException { + // Map<String, String> priorConf = HadoopUtil.deserializeBase64( property, conf, true ); + // return flowProcess.mergeMapIntoConfig( conf, priorConf ); + + return null; + } + + @Override + public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException { + // input may be null when this method is called on the client side or cluster side when accumulating + // for a HashJoin + return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); + } + + @Override + public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException { + if( !isSink() ) + throw new TapException( "this tap may not be used as a sink, no TableDesc defined" ); + + LOG.info("Creating JDBCTapCollector output instance"); + JDBCTapCollector jdbcCollector = new JDBCTapCollector( flowProcess, this ); + + jdbcCollector.prepare(); + + return jdbcCollector; + } + + @Override + public boolean isSink() + { + return tableDesc != null; + } + + @Override + public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf ) + { + // a hack for MultiInputFormat to see that there is a child format + FileInputFormat.setInputPaths( conf, getPath() ); + + if( username == null ) + DBConfiguration.configureDB(conf, driverClassName, connectionUrl); + else + DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password ); + + super.sourceConfInit( process, conf ); + } + + @Override + public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf ) + { + if( !isSink() ) + return; + + // do not delete if initialized from within a task + try { + if( isReplace() && conf.get( "mapred.task.partition" ) == null && !deleteResource( conf ) ) + throw new TapException( "unable to drop table: " + tableDesc.getTableName() ); + + if( !createResource( conf ) ) + throw new TapException( "unable to create table: " + tableDesc.getTableName() ); + } catch(IOException e) { + throw new TapException( "error while trying to modify table: " + tableDesc.getTableName() ); + } + + if( username == null ) + DBConfiguration.configureDB( conf, driverClassName, connectionUrl ); + else + DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password ); + + super.sinkConfInit( process, conf ); + } + + private Connection createConnection() + { + try + { + LOG.info( "creating connection: {}", connectionUrl ); + + Class.forName( driverClassName ); + + Connection connection = null; + + if( username == null ) + connection = DriverManager.getConnection( connectionUrl ); + else + connection = DriverManager.getConnection( connectionUrl, username, password ); + + connection.setAutoCommit( false ); + + return connection; + } + catch( ClassNotFoundException exception ) + { + throw new TapException( "unable to load driver class: " + driverClassName, exception ); + } + catch( SQLException exception ) + { + throw new TapException( "unable to open connection: " + connectionUrl, exception ); + } + } + + /** + * Method executeUpdate allows for ad-hoc update statements to be sent to the remote RDBMS. The number of + * rows updated will be returned, if applicable. + * + * @param updateString of type String + * @return int + */ + public int executeUpdate( String updateString ) + { + Connection connection = null; + int result; + + try + { + connection = createConnection(); + + try + { + LOG.info( "executing update: {}", updateString ); + + Statement statement = connection.createStatement(); + + result = statement.executeUpdate( updateString ); + + connection.commit(); + statement.close(); + } + catch( SQLException exception ) + { + throw new TapException( "unable to execute update statement: " + updateString, exception ); + } + } + finally + { + try + { + if( connection != null ) + connection.close(); + } + catch( SQLException exception ) + { + // ignore + LOG.warn( "ignoring connection close exception", exception ); + } + } + + return result; + } + + /** + * Method executeQuery allows for ad-hoc queries to be sent to the remove RDBMS. A value + * of -1 for returnResults will return a List of all results from the query, a value of 0 will return an empty List. + * + * @param queryString of type String + * @param returnResults of type int + * @return List + */ + public List<Object[]> executeQuery( String queryString, int returnResults ) + { + Connection connection = null; + List<Object[]> result = Collections.emptyList(); + + try + { + connection = createConnection(); + + try + { + LOG.info( "executing query: {}", queryString ); + + Statement statement = connection.createStatement(); + + ResultSet resultSet = statement.executeQuery( queryString ); // we don't care about results + + if( returnResults != 0 ) + result = copyResultSet( resultSet, returnResults == -1 ? Integer.MAX_VALUE : returnResults ); + + connection.commit(); + statement.close(); + } + catch( SQLException exception ) + { + throw new TapException( "unable to execute query statement: " + queryString, exception ); + } + } + finally + { + try + { + if( connection != null ) + connection.close(); + } + catch( SQLException exception ) + { + // ignore + LOG.warn( "ignoring connection close exception", exception ); + } + } + + return result; + } + + private List<Object[]> copyResultSet( ResultSet resultSet, int length ) throws SQLException + { + List<Object[]> results = new ArrayList<Object[]>( length ); + int size = resultSet.getMetaData().getColumnCount(); + + int count = 0; + + while( resultSet.next() && count < length ) + { + count++; + + Object[] row = new Object[size]; + + for( int i = 0; i < row.length; i++ ) + row[ i ] = resultSet.getObject( i + 1 ); + + results.add( row ); + } + + return results; + } + + @Override + public boolean createResource( JobConf conf ) throws IOException + { + if( resourceExists( conf ) ) + return true; + + try + { + LOG.info( "creating table: {}", tableDesc.tableName ); + + executeUpdate( tableDesc.getCreateTableStatement() ); + } + catch( TapException exception ) + { + LOG.warn( "unable to create table: {}", tableDesc.tableName ); + LOG.warn( "sql failure", exception.getCause() ); + + return false; + } + + return resourceExists( conf ); + } + + @Override + public boolean deleteResource( JobConf conf ) throws IOException + { + if( !isSink() ) + return false; + + if( !resourceExists( conf ) ) + return true; + + try + { + LOG.info( "deleting table: {}", tableDesc.tableName ); + + executeUpdate( tableDesc.getTableDropStatement() ); + } + catch( TapException exception ) + { + LOG.warn( "unable to drop table: {}", tableDesc.tableName ); + LOG.warn( "sql failure", exception.getCause() ); + + return false; + } + + return !resourceExists( conf ); + } + + @Override + public boolean resourceExists( JobConf conf ) throws IOException + { + if( !isSink() ) + return true; + + try + { + LOG.info( "test table exists: {}", tableDesc.tableName ); + + executeQuery( tableDesc.getTableExistsQuery(), 0 ); + } + catch( TapException exception ) + { + return false; + } + + return true; + } + + @Override + public long getModifiedTime( JobConf conf ) throws IOException + { + return System.currentTimeMillis(); + } + + @Override + public String toString() + { + return "JDBCTap{" + "connectionUrl='" + connectionUrl + '\'' + ", driverClassName='" + driverClassName + '\'' + ", tableDesc=" + tableDesc + '}'; + } + + @Override + public boolean equals( Object object ) + { + if( this == object ) + return true; + if( !( object instanceof JDBCTap ) ) + return false; + if( !super.equals( object ) ) + return false; + + JDBCTap jdbcTap = (JDBCTap) object; + + if( connectionUrl != null ? !connectionUrl.equals( jdbcTap.connectionUrl ) : jdbcTap.connectionUrl != null ) + return false; + if( driverClassName != null ? !driverClassName.equals( jdbcTap.driverClassName ) : jdbcTap.driverClassName != null ) + return false; + if( password != null ? !password.equals( jdbcTap.password ) : jdbcTap.password != null ) + return false; + if( tableDesc != null ? !tableDesc.equals( jdbcTap.tableDesc ) : jdbcTap.tableDesc != null ) + return false; + if( username != null ? !username.equals( jdbcTap.username ) : jdbcTap.username != null ) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + ( connectionUrl != null ? connectionUrl.hashCode() : 0 ); + result = 31 * result + ( username != null ? username.hashCode() : 0 ); + result = 31 * result + ( password != null ? password.hashCode() : 0 ); + result = 31 * result + ( driverClassName != null ? driverClassName.hashCode() : 0 ); + result = 31 * result + ( tableDesc != null ? tableDesc.hashCode() : 0 ); + result = 31 * result + batchSize; + return result; + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java new file mode 100644 index 0000000..e8b7deb --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class JDBCTapCollector is a kind of {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the resource managed by + * a particular {@link JDBCTap} instance. + */ +public class JDBCTapCollector extends TupleEntrySchemeCollector implements OutputCollector +{ + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger( JDBCTapCollector.class ); + + /** Field conf */ + private final JobConf conf; + /** Field writer */ + private RecordWriter writer; + /** Field flowProcess */ + private final FlowProcess<JobConf> hadoopFlowProcess; + /** Field tap */ + private final Tap<JobConf, RecordReader, OutputCollector> tap; + /** Field reporter */ + private final Reporter reporter = Reporter.NULL; + + /** + * Constructor TapCollector creates a new TapCollector instance. + * + * @param flowProcess + * @param tap of type Tap + * @throws IOException when fails to initialize + */ + public JDBCTapCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException { + super( flowProcess, tap.getScheme() ); + this.hadoopFlowProcess = flowProcess; + + this.tap = tap; + this.conf = new JobConf( flowProcess.getConfigCopy() ); + + this.setOutput( this ); + } + + @Override + public void prepare() { + try { + initialize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + super.prepare(); + } + + private void initialize() throws IOException { + tap.sinkConfInit( hadoopFlowProcess, conf ); + + OutputFormat outputFormat = conf.getOutputFormat(); + + LOG.info("Output format class is: " + outputFormat.getClass().toString()); + + writer = outputFormat.getRecordWriter( null, conf, tap.getIdentifier(), Reporter.NULL ); + + sinkCall.setOutput( this ); + } + + @Override + public void close() { + try { + LOG.info( "closing tap collector for: {}", tap ); + writer.close( reporter ); + } catch( IOException exception ) { + LOG.warn( "exception closing: {}", exception ); + throw new TapException( "exception closing JDBCTapCollector", exception ); + } finally { + super.close(); + } + } + + /** + * Method collect writes the given values to the {@link Tap} this instance encapsulates. + * + * @param writableComparable of type WritableComparable + * @param writable of type Writable + * @throws IOException when + */ + public void collect( Object writableComparable, Object writable ) throws IOException { + if (hadoopFlowProcess instanceof HadoopFlowProcess) + ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + + writer.write( writableComparable, writable ); + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/TableDesc.java b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java new file mode 100644 index 0000000..b0aaea7 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.util.Util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Class TableDesc describes a SQL based table, this description is used by the {@link JDBCTap} when + * creating a missing table. + * + * @see JDBCTap + * @see JDBCScheme + */ +public class TableDesc implements Serializable { + /** Field tableName */ + String tableName; + /** Field columnNames */ + String[] columnNames; + /** Field columnDefs */ + String[] columnDefs; + /** Field primaryKeys */ + String[] primaryKeys; + + /** + * Constructor TableDesc creates a new TableDesc instance. + * + * @param tableName of type String + */ + public TableDesc( String tableName ) { + this.tableName = tableName; + } + + /** + * Constructor TableDesc creates a new TableDesc instance. + * + * @param tableName of type String + * @param columnNames of type String[] + * @param columnDefs of type String[] + * @param primaryKeys of type String + */ + public TableDesc( String tableName, String[] columnNames, String[] columnDefs, String[] primaryKeys ) { + this.tableName = tableName; + this.columnNames = columnNames; + this.columnDefs = columnDefs; + this.primaryKeys = primaryKeys; + } + + public String getTableName() { + return tableName; + } + + public String[] getColumnNames() { + return columnNames; + } + + public String[] getColumnDefs() { + return columnDefs; + } + + public String[] getPrimaryKeys() { + return primaryKeys; + } + + /** + * Method getTableCreateStatement returns the tableCreateStatement of this TableDesc object. + * + * @return the tableCreateStatement (type String) of this TableDesc object. + */ + public String getCreateTableStatement() { + List<String> createTableStatement = new ArrayList<String>(); + + createTableStatement = addCreateTableBodyTo( createTableStatement ); + + return String.format( getCreateTableFormat(), tableName, Util.join( createTableStatement, ", " ) ); + } + + protected List<String> addCreateTableBodyTo( List<String> createTableStatement ) { + createTableStatement = addDefinitionsTo( createTableStatement ); + createTableStatement = addPrimaryKeyTo( createTableStatement ); + + return createTableStatement; + } + + protected String getCreateTableFormat() { + return "CREATE TABLE %s ( %s )"; + } + + protected List<String> addDefinitionsTo( List<String> createTableStatement ) { + for( int i = 0; i < columnNames.length; i++ ) { + String columnName = columnNames[ i ]; + String columnDef = columnDefs[ i ]; + + createTableStatement.add( columnName + " " + columnDef ); + } + + return createTableStatement; + } + + protected List<String> addPrimaryKeyTo( List<String> createTableStatement ) { + if( hasPrimaryKey() ) + createTableStatement.add( String.format( "PRIMARY KEY( %s )", Util.join( primaryKeys, ", " ) ) ); + + return createTableStatement; + } + + /** + * Method getTableDropStatement returns the tableDropStatement of this TableDesc object. + * + * @return the tableDropStatement (type String) of this TableDesc object. + */ + public String getTableDropStatement() { + return String.format( getDropTableFormat(), tableName ); + } + + protected String getDropTableFormat() { + return "DROP TABLE %s"; + } + + /** + * Method getTableExistsQuery returns the tableExistsQuery of this TableDesc object. + * + * @return the tableExistsQuery (type String) of this TableDesc object. + */ + public String getTableExistsQuery() { + return String.format( "select 1 from %s where 1 = 0", tableName ); + } + + private boolean hasPrimaryKey() { + return primaryKeys != null && primaryKeys.length != 0; + } + + @Override + public String toString() { + return "TableDesc{" + "tableName='" + tableName + '\'' + ", columnNames=" + ( columnNames == null ? null : Arrays.asList( columnNames ) ) + ", columnDefs=" + ( columnDefs == null ? null : Arrays.asList( columnDefs ) ) + ", primaryKeys=" + ( primaryKeys == null ? null : Arrays.asList( primaryKeys ) ) + '}'; + } + + @Override + public boolean equals( Object object ) { + if( this == object ) + return true; + if( !( object instanceof TableDesc ) ) + return false; + + TableDesc tableDesc = (TableDesc) object; + + if( !Arrays.equals( columnDefs, tableDesc.columnDefs ) ) + return false; + if( !Arrays.equals( columnNames, tableDesc.columnNames ) ) + return false; + if( !Arrays.equals( primaryKeys, tableDesc.primaryKeys ) ) + return false; + if( tableName != null ? !tableName.equals( tableDesc.tableName ) : tableDesc.tableName != null ) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = tableName != null ? tableName.hashCode() : 0; + result = 31 * result + ( columnNames != null ? Arrays.hashCode( columnNames ) : 0 ); + result = 31 * result + ( columnDefs != null ? Arrays.hashCode( columnDefs ) : 0 ); + result = 31 * result + ( primaryKeys != null ? Arrays.hashCode( primaryKeys ) : 0 ); + return result; + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java new file mode 100644 index 0000000..4191e79 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.tuple.Tuple; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import parallelai.spyglass.jdbc.db.DBWritable; + +public class TupleRecord implements DBWritable { + private Tuple tuple; + + public TupleRecord() { + } + + public TupleRecord( Tuple tuple ) { + this.tuple = tuple; + } + + public void setTuple( Tuple tuple ) { + this.tuple = tuple; + } + + public Tuple getTuple() { + return tuple; + } + + public void write( PreparedStatement statement ) throws SQLException { + for( int i = 0; i < tuple.size(); i++ ) { + //System.out.println("Insert Tuple => " + " statement.setObject( " + (i + 1) + "," + tuple.get( i )); + statement.setObject( i + 1, tuple.get( i ) ); + } + boolean test = true; + if (test) { + for( int i = 1; i < tuple.size(); i++ ) { + //System.out.println("Update Tuple => " + " statement.setObject( " + (i + tuple.size()) + "," + tuple.get( i )); + statement.setObject( i + tuple.size(), tuple.get( i ) ); + } + } + + } + + public void readFields( ResultSet resultSet ) throws SQLException { + tuple = new Tuple(); + + for( int i = 0; i < resultSet.getMetaData().getColumnCount(); i++ ) + tuple.add( (Comparable) resultSet.getObject( i + 1 ) ); + } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java new file mode 100644 index 0000000..1bb0786 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * A container for configuration property names for jobs with DB input/output. <br> The job can be + * configured using the static methods in this class, {@link DBInputFormat}, and {@link + * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper + * values. + */ +public class DBConfiguration { + + /** The JDBC Driver class name */ + public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class"; + + /** JDBC Database access URL */ + public static final String URL_PROPERTY = "mapred.jdbc.url"; + + /** User name to access the database */ + public static final String USERNAME_PROPERTY = "mapred.jdbc.username"; + + /** Password to access the database */ + public static final String PASSWORD_PROPERTY = "mapred.jdbc.password"; + + /** Input table name */ + public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name"; + + /** Field names in the Input table */ + public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names"; + + /** WHERE clause in the input SELECT statement */ + public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions"; + + /** ORDER BY clause in the input SELECT statement */ + public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby"; + + /** Whole input query, exluding LIMIT...OFFSET */ + public static final String INPUT_QUERY = "mapred.jdbc.input.query"; + + /** The number of records to LIMIT, useful for testing */ + public static final String INPUT_LIMIT = "mapred.jdbc.input.limit"; + + /** Input query to get the count of records */ + public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query"; + + /** Class name implementing DBWritable which will hold input tuples */ + public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class"; + + /** Output table name */ + public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name"; + + /** Field names in the Output table */ + public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names"; + + /** Field names in the Output table */ + public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY = + "mapred.jdbc.output.update.field.names"; + + /** The number of statements to batch before executing */ + public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num"; + + /** The number of splits allowed, becomes max concurrent reads. */ + public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num"; + + /** + * Sets the DB access related fields in the Configuration. + * + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL. + * @param userName DB access username + * @param passwd DB access passwd + */ + public static void configureDB(Configuration job, String driverClass, String dbUrl, + String userName, String passwd) { + job.set(DRIVER_CLASS_PROPERTY, driverClass); + job.set(URL_PROPERTY, dbUrl); + + if (userName != null) { job.set(USERNAME_PROPERTY, userName); } + + if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); } + } + + /** + * Sets the DB access related fields in the Configuration. + * + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL. + */ + public static void configureDB(Configuration job, String driverClass, String dbUrl) { + configureDB(job, driverClass, dbUrl, null, null); + } + + private Configuration job; + + DBConfiguration(Configuration job) { + this.job = job; + } + + /** + * Returns a connection object to the DB + * + * @throws ClassNotFoundException + * @throws SQLException + */ + Connection getConnection() throws IOException { + try { + Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); + } catch (ClassNotFoundException exception) { + throw new IOException("unable to load conection driver", exception); + } + + try { + if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) { + return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); + } else { + return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job + .get(DBConfiguration.USERNAME_PROPERTY), job + .get(DBConfiguration.PASSWORD_PROPERTY)); + } + } catch (SQLException exception) { + throw new IOException("unable to create connection", exception); + } + } + + String getInputTableName() { + return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); + } + + void setInputTableName(String tableName) { + job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); + } + + String[] getInputFieldNames() { + return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY); + } + + void setInputFieldNames(String... fieldNames) { + job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); + } + + String getInputConditions() { + return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY); + } + + void setInputConditions(String conditions) { + if (conditions != null && conditions.length() > 0) { + job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions); + } + } + + String getInputOrderBy() { + return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY); + } + + void setInputOrderBy(String orderby) { + if (orderby != null && orderby.length() > 0) { + job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby); + } + } + + String getInputQuery() { + return job.get(DBConfiguration.INPUT_QUERY); + } + + void setInputQuery(String query) { + if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); } + } + + long getInputLimit() { + return job.getLong(DBConfiguration.INPUT_LIMIT, -1); + } + + void setInputLimit(long limit) { + job.setLong(DBConfiguration.INPUT_LIMIT, limit); + } + + String getInputCountQuery() { + return job.get(DBConfiguration.INPUT_COUNT_QUERY); + } + + void setInputCountQuery(String query) { + if (query != null && query.length() > 0) { + job.set(DBConfiguration.INPUT_COUNT_QUERY, query); + } + } + + Class<?> getInputClass() { + return job + .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class); + } + + void setInputClass(Class<? extends DBWritable> inputClass) { + job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class); + } + + String getOutputTableName() { + return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY); + } + + void setOutputTableName(String tableName) { + job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); + } + + String[] getOutputFieldNames() { + return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY); + } + + void setOutputFieldNames(String... fieldNames) { + job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames); + } + + String[] getOutputUpdateFieldNames() { + return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY); + } + + void setOutputUpdateFieldNames(String... fieldNames) { + job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames); + } + + int getBatchStatementsNum() { + return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000); + } + + void setBatchStatementsNum(int batchStatementsNum) { + job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum); + } + + int getMaxConcurrentReadsNum() { + return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0); + } + + void setMaxConcurrentReadsNum(int maxConcurrentReads) { + if (maxConcurrentReads < 0) { + throw new IllegalArgumentException("maxConcurrentReads must be a positive value"); + } + + job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads); + } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java new file mode 100644 index 0000000..115d9bd --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java @@ -0,0 +1,452 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.*; + +/** + * A InputFormat that reads input data from an SQL table. <p/> DBInputFormat emits LongWritables + * containing the record number as key and DBWritables as value. <p/> The SQL query, and input class + * can be using one of the two setInput methods. + */ +public class DBInputFormat<T extends DBWritable> + implements InputFormat<LongWritable, T>, JobConfigurable { + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(DBInputFormat.class); + + /** + * A RecordReader that reads records from a SQL table. Emits LongWritables containing the record + * number as key and DBWritables as value. + */ + protected class DBRecordReader implements RecordReader<LongWritable, T> { + private ResultSet results; + private Statement statement; + private Class<T> inputClass; + private JobConf job; + private DBInputSplit split; + private long pos = 0; + + /** + * @param split The InputSplit to read data for + * @throws SQLException + */ + protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) + throws SQLException, IOException { + this.inputClass = inputClass; + this.split = split; + this.job = job; + + statement = + connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + //statement.setFetchSize(Integer.MIN_VALUE); + String query = getSelectQuery(); + try { + LOG.info(query); + results = statement.executeQuery(query); + LOG.info("done executing select query"); + } catch (SQLException exception) { + LOG.error("unable to execute select query: " + query, exception); + throw new IOException("unable to execute select query: " + query, exception); + } + } + + /** + * Returns the query for selecting the records, subclasses can override this for custom + * behaviour. + */ + protected String getSelectQuery() { + LOG.info("Executing select query"); + StringBuilder query = new StringBuilder(); + + if (dbConf.getInputQuery() == null) { + query.append("SELECT "); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + + if (i != fieldNames.length - 1) + query.append(", "); + } + + query.append(" FROM ").append(tableName); + query.append(" AS ").append(tableName); //in hsqldb this is necessary + + if (conditions != null && conditions.length() > 0) + query.append(" WHERE (").append(conditions).append(")"); + + String orderBy = dbConf.getInputOrderBy(); + + if (orderBy != null && orderBy.length() > 0) + query.append(" ORDER BY ").append(orderBy); + + } + else + query.append(dbConf.getInputQuery()); + + try { + // Only add limit and offset if you have multiple chunks + if(split.getChunks() > 1) { + query.append(" LIMIT ").append(split.getLength()); + query.append(" OFFSET ").append(split.getStart()); + } + } catch (IOException ex) { + //ignore, will not throw + } + + return query.toString(); + } + + /** {@inheritDoc} */ + public void close() throws IOException { + try { + connection.commit(); + results.close(); + statement.close(); + } catch (SQLException exception) { + throw new IOException("unable to commit and close", exception); + } + } + + /** {@inheritDoc} */ + public LongWritable createKey() { + return new LongWritable(); + } + + /** {@inheritDoc} */ + public T createValue() { + return ReflectionUtils.newInstance(inputClass, job); + } + + /** {@inheritDoc} */ + public long getPos() throws IOException { + return pos; + } + + /** {@inheritDoc} */ + public float getProgress() throws IOException { + return pos / (float) split.getLength(); + } + + /** {@inheritDoc} */ + public boolean next(LongWritable key, T value) throws IOException { + try { + if (!results.next()) + return false; + + // Set the key field value as the output key value + key.set(pos + split.getStart()); + + value.readFields(results); + + pos++; + } catch (SQLException exception) { + throw new IOException("unable to get next value", exception); + } + + return true; + } + } + + /** A Class that does nothing, implementing DBWritable */ + public static class NullDBWritable implements DBWritable, Writable { + + public void readFields(DataInput in) throws IOException { + } + + public void readFields(ResultSet arg0) throws SQLException { + } + + public void write(DataOutput out) throws IOException { + } + + public void write(PreparedStatement arg0) throws SQLException { + } + } + + /** A InputSplit that spans a set of rows */ + protected static class DBInputSplit implements InputSplit { + private long end = 0; + private long start = 0; + private long chunks = 0; + + /** Default Constructor */ + public DBInputSplit() { + } + + /** + * Convenience Constructor + * + * @param start the index of the first row to select + * @param end the index of the last row to select + */ + public DBInputSplit(long start, long end, long chunks) { + this.start = start; + this.end = end; + this.chunks = chunks; + LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks); + } + + /** {@inheritDoc} */ + public String[] getLocations() throws IOException { + // TODO Add a layer to enable SQL "sharding" and support locality + return new String[]{}; + } + + /** @return The index of the first row to select */ + public long getStart() { + return start; + } + + /** @return The index of the last row to select */ + public long getEnd() { + return end; + } + + /** @return The total row count in this split */ + public long getLength() throws IOException { + return end - start; + } + + /** @return The total number of chucks accross all splits */ + public long getChunks() { + return chunks; + } + + /** {@inheritDoc} */ + public void readFields(DataInput input) throws IOException { + start = input.readLong(); + end = input.readLong(); + chunks = input.readLong(); + } + + /** {@inheritDoc} */ + public void write(DataOutput output) throws IOException { + output.writeLong(start); + output.writeLong(end); + output.writeLong(chunks); + } + } + + protected DBConfiguration dbConf; + protected Connection connection; + + protected String tableName; + protected String[] fieldNames; + protected String conditions; + protected long limit; + protected int maxConcurrentReads; + + + /** {@inheritDoc} */ + public void configure(JobConf job) { + dbConf = new DBConfiguration(job); + + tableName = dbConf.getInputTableName(); + fieldNames = dbConf.getInputFieldNames(); + conditions = dbConf.getInputConditions(); + limit = dbConf.getInputLimit(); + maxConcurrentReads = dbConf.getMaxConcurrentReadsNum(); + + try { + connection = dbConf.getConnection(); + } catch (IOException exception) { + throw new RuntimeException("unable to create connection", exception.getCause()); + } + + configureConnection(connection); + } + + protected void configureConnection(Connection connection) { + setTransactionIsolationLevel(connection); + setAutoCommit(connection); + } + + protected void setAutoCommit(Connection connection) { + try { + connection.setAutoCommit(false); + } catch (Exception exception) { + throw new RuntimeException("unable to set auto commit", exception); + } + } + + protected void setTransactionIsolationLevel(Connection connection) { + try { + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + } catch (SQLException exception) { + throw new RuntimeException("unable to configure transaction isolation level", exception); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + Class inputClass = dbConf.getInputClass(); + try { + return new DBRecordReader((DBInputSplit) split, inputClass, job); + } catch (SQLException exception) { + throw new IOException(exception.getMessage(), exception); + } + } + + /** {@inheritDoc} */ + public InputSplit[] getSplits(JobConf job, int chunks) throws IOException { + // use the configured value if avail + chunks = maxConcurrentReads == 0 ? chunks : maxConcurrentReads; + + try { + Statement statement = connection.createStatement(); + + ResultSet results = statement.executeQuery(getCountQuery()); + + long count = 0; + + while (results.next()) + count += results.getLong(1); + + if (limit != -1) + count = Math.min(limit, count); + + long chunkSize = (count / chunks); + + results.close(); + statement.close(); + + InputSplit[] splits = new InputSplit[chunks]; + + // Split the rows into n-number of chunks and adjust the last chunk + // accordingly + for (int i = 0; i < chunks; i++) { + DBInputSplit split; + + if (i + 1 == chunks) + split = new DBInputSplit(i * chunkSize, count, chunks); + else + split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks); + + splits[i] = split; + } + + return splits; + } catch (SQLException e) { + throw new IOException(e.getMessage()); + } + } + + /** + * Returns the query for getting the total number of rows, subclasses can override this for + * custom behaviour. + */ + protected String getCountQuery() { + + if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); } + + StringBuilder query = new StringBuilder(); + + query.append("SELECT COUNT(*) FROM " + tableName); + + if (conditions != null && conditions.length() > 0) + query.append(" WHERE " + conditions); + + return query.toString(); + } + + /** + * Initializes the map-part of the job with the appropriate input settings. + * + * @param job The job + * @param inputClass the class object implementing DBWritable, which is the Java object + * holding tuple fields. + * @param tableName The table to read data from + * @param conditions The condition which to select data with, eg. '(updated > 20070101 AND + * length > 0)' + * @param orderBy the fieldNames in the orderBy clause. + * @param limit + * @param fieldNames The field names in the table + * @param concurrentReads + */ + public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, + String tableName, String conditions, String orderBy, long limit, int concurrentReads, + String... fieldNames) { + job.setInputFormat(DBInputFormat.class); + + DBConfiguration dbConf = new DBConfiguration(job); + + dbConf.setInputClass(inputClass); + dbConf.setInputTableName(tableName); + dbConf.setInputFieldNames(fieldNames); + dbConf.setInputConditions(conditions); + dbConf.setInputOrderBy(orderBy); + + if (limit != -1) + dbConf.setInputLimit(limit); + + dbConf.setMaxConcurrentReadsNum(concurrentReads); + } + + /** + * Initializes the map-part of the job with the appropriate input settings. + * + * @param job The job + * @param inputClass the class object implementing DBWritable, which is the Java object + * holding tuple fields. + * @param selectQuery the input query to select fields. Example : "SELECT f1, f2, f3 FROM + * Mytable ORDER BY f1" + * @param countQuery the input query that returns the number of records in the table. + * Example : "SELECT COUNT(f1) FROM Mytable" + * @param concurrentReads + */ + public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, + String selectQuery, String countQuery, long limit, int concurrentReads) { + job.setInputFormat(DBInputFormat.class); + + DBConfiguration dbConf = new DBConfiguration(job); + + dbConf.setInputClass(inputClass); + dbConf.setInputQuery(selectQuery); + dbConf.setInputCountQuery(countQuery); + + if (limit != -1) + dbConf.setInputLimit(limit); + + dbConf.setMaxConcurrentReadsNum(concurrentReads); + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java new file mode 100644 index 0000000..1166970 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts + * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} + * writes <b>only the key</b> to the database with a batch SQL query. + */ +public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { + private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + + /** A RecordWriter that writes the reduce output to a SQL table */ + protected class DBRecordWriter implements RecordWriter<K, V> { + private Connection connection; + private PreparedStatement insertStatement; + private PreparedStatement updateStatement; + private final int statementsBeforeExecute; + + private long statementsAdded = 0; + private long insertStatementsCurrent = 0; + private long updateStatementsCurrent = 0; + + protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, + PreparedStatement updateStatement, int statementsBeforeExecute) { + this.connection = connection; + this.insertStatement = insertStatement; + this.updateStatement = updateStatement; + this.statementsBeforeExecute = statementsBeforeExecute; + } + + /** {@inheritDoc} */ + public void close(Reporter reporter) throws IOException { + executeBatch(); + + try { + if (insertStatement != null) { insertStatement.close(); } + + if (updateStatement != null) { updateStatement.close(); } + + connection.commit(); + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to commit batch", 0, exception); + } finally { + try { + connection.close(); + } catch (SQLException exception) { + throw new IOException("unable to close connection", exception); + } + } + } + + private void executeBatch() throws IOException { + try { + if (insertStatementsCurrent != 0) { + LOG.info( + "executing insert batch " + createBatchMessage(insertStatementsCurrent)); + + insertStatement.executeBatch(); + } + + insertStatementsCurrent = 0; + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); + } + + try { + if (updateStatementsCurrent != 0) { + LOG.info( + "executing update batch " + createBatchMessage(updateStatementsCurrent)); + + int[] result = updateStatement.executeBatch(); + + int count = 0; + + for (int value : result) { count += value; } + + if (count != updateStatementsCurrent) { + throw new IOException( + "update did not update same number of statements executed in batch, batch: " + + updateStatementsCurrent + " updated: " + count); + } + } + + updateStatementsCurrent = 0; + } catch (SQLException exception) { + + String message = exception.getMessage(); + if (message.indexOf("Duplicate Key") >= 0) { + LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); + } else { + rollBack(); + + createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); + } + } + } + + private void rollBack() { + try { + connection.rollback(); + } catch (SQLException sqlException) { + LOG.warn(StringUtils.stringifyException(sqlException)); + } + } + + private String createBatchMessage(long currentStatements) { + return String + .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); + } + + private void createThrowMessage(String stateMessage, long currentStatements, + SQLException exception) throws IOException { + String message = exception.getMessage(); + + message = message.substring(0, Math.min(75, message.length())); + + int messageLength = exception.getMessage().length(); + String batchMessage = createBatchMessage(currentStatements); + String template = "%s [msglength: %d]%s %s"; + String errorMessage = + String.format(template, stateMessage, messageLength, batchMessage, message); + + LOG.error(errorMessage, exception.getNextException()); + + throw new IOException(errorMessage, exception.getNextException()); + } + + /** {@inheritDoc} */ + public synchronized void write(K key, V value) throws IOException { + try { + if (value == null) { + key.write(insertStatement); + insertStatement.addBatch(); + insertStatementsCurrent++; + } else { + key.write(updateStatement); + updateStatement.addBatch(); + updateStatementsCurrent++; + } + } catch (SQLException exception) { + throw new IOException("unable to add batch statement", exception); + } + + statementsAdded++; + + if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } + } + } + + /** + * Constructs the query used as the prepared statement to insert data. + * + * @param table the table to insert into + * @param fieldNames the fields to insert into. If field names are unknown, supply an array of + * nulls. + */ + protected String constructInsertQuery(String table, String[] fieldNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("Field names may not be null"); + } + + StringBuilder query = new StringBuilder(); + + query.append("INSERT INTO ").append(table); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + query.append(" ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + + if (i != fieldNames.length - 1) { query.append(","); } + } + + query.append(")"); + + } + + query.append(" VALUES ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append("?"); + + if (i != fieldNames.length - 1) { query.append(","); } + } + + query.append(")"); + + boolean test = true; + if (test) { + query.append(" ON DUPLICATE KEY UPDATE "); + + + for (int i = 1; i < fieldNames.length; i++) { + + + if ( (i != 1) ) { query.append(","); } + //if (i != fieldNames.length - 1) { query.append(","); } + //&& (i != fieldNames.length - 1) + query.append(fieldNames[i]); + query.append(" = ?"); + + + } + } + + query.append(";"); + + LOG.info(" ===================== " + query.toString()); + return query.toString(); + } + + protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("field names may not be null"); + } + + Set<String> updateNamesSet = new HashSet<String>(); + Collections.addAll(updateNamesSet, updateNames); + + StringBuilder query = new StringBuilder(); + + query.append("UPDATE ").append(table); + + query.append(" SET "); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + int count = 0; + + for (int i = 0; i < fieldNames.length; i++) { + if (updateNamesSet.contains(fieldNames[i])) { continue; } + + if (count != 0) { query.append(","); } + + query.append(fieldNames[i]); + query.append(" = ?"); + + count++; + } + } + + query.append(" WHERE "); + + if (updateNames.length > 0 && updateNames[0] != null) { + for (int i = 0; i < updateNames.length; i++) { + query.append(updateNames[i]); + query.append(" = ?"); + + if (i != updateNames.length - 1) { query.append(" and "); } + } + } + + query.append(";"); + System.out.println("Update Query => " + query.toString()); + return query.toString(); + } + + /** {@inheritDoc} */ + public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { + } + + /** {@inheritDoc} */ + public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, + Progressable progress) throws IOException { + DBConfiguration dbConf = new DBConfiguration(job); + + String tableName = dbConf.getOutputTableName(); + String[] fieldNames = dbConf.getOutputFieldNames(); + String[] updateNames = dbConf.getOutputUpdateFieldNames(); + int batchStatements = dbConf.getBatchStatementsNum(); + + Connection connection = dbConf.getConnection(); + + configureConnection(connection); + + String sqlInsert = constructInsertQuery(tableName, fieldNames); + PreparedStatement insertPreparedStatement; + + try { + insertPreparedStatement = connection.prepareStatement(sqlInsert); + insertPreparedStatement.setEscapeProcessing(true); // should be on by default + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlInsert, exception); + } + + String sqlUpdate = + updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; + PreparedStatement updatePreparedStatement = null; + + try { + updatePreparedStatement = + sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlUpdate, exception); + } + + return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); + } + + protected void configureConnection(Connection connection) { + setAutoCommit(connection); + } + + protected void setAutoCommit(Connection connection) { + try { + connection.setAutoCommit(false); + } catch (Exception exception) { + throw new RuntimeException("unable to set auto commit", exception); + } + } + + /** + * Initializes the reduce-part of the job with the appropriate output settings + * + * @param job The job + * @param dbOutputFormatClass + * @param tableName The table to insert data into + * @param fieldNames The field names in the table. If unknown, supply the appropriate + */ + public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, + String tableName, String[] fieldNames, String[] updateFields, int batchSize) { + if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { + job.setOutputFormat(dbOutputFormatClass); + } + + // writing doesn't always happen in reduce + job.setReduceSpeculativeExecution(false); + job.setMapSpeculativeExecution(false); + + DBConfiguration dbConf = new DBConfiguration(job); + + dbConf.setOutputTableName(tableName); + dbConf.setOutputFieldNames(fieldNames); + + if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } + + if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } + } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java new file mode 100644 index 0000000..1369c74 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.Writable; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Objects that are read from/written to a database should implement <code>DBWritable</code>. + * DBWritable, is similar to {@link Writable} except that the {@link #write(PreparedStatement)} + * method takes a {@link PreparedStatement}, and {@link #readFields(ResultSet)} takes a {@link + * ResultSet}. <p> Implementations are responsible for writing the fields of the object to + * PreparedStatement, and reading the fields of the object from the ResultSet. <p/> <p>Example:</p> + * If we have the following table in the database : + * <pre> + * CREATE TABLE MyTable ( + * counter INTEGER NOT NULL, + * timestamp BIGINT NOT NULL, + * ); + * </pre> + * then we can read/write the tuples from/to the table with : + * <p><pre> + * public class MyWritable implements Writable, DBWritable { + * // Some data + * private int counter; + * private long timestamp; + * <p/> + * //Writable#write() implementation + * public void write(DataOutput out) throws IOException { + * out.writeInt(counter); + * out.writeLong(timestamp); + * } + * <p/> + * //Writable#readFields() implementation + * public void readFields(DataInput in) throws IOException { + * counter = in.readInt(); + * timestamp = in.readLong(); + * } + * <p/> + * public void write(PreparedStatement statement) throws SQLException { + * statement.setInt(1, counter); + * statement.setLong(2, timestamp); + * } + * <p/> + * public void readFields(ResultSet resultSet) throws SQLException { + * counter = resultSet.getInt(1); + * timestamp = resultSet.getLong(2); + * } + * } + * </pre></p> + */ +public interface DBWritable { + + /** + * Sets the fields of the object in the {@link PreparedStatement}. + * + * @param statement the statement that the fields are put into. + * @throws SQLException + */ + public void write(PreparedStatement statement) throws SQLException; + + /** + * Reads the fields of the object from the {@link ResultSet}. + * + * @param resultSet the {@link ResultSet} to get the fields from. + * @throws SQLException + */ + public void readFields(ResultSet resultSet) throws SQLException; + +} |