diff options
author | Saad Rashid <saad373@gmail.com> | 2013-06-24 17:13:19 +0100 |
---|---|---|
committer | Saad Rashid <saad373@gmail.com> | 2013-06-24 17:13:19 +0100 |
commit | 3e4a74de169104679f2459947c27d5cfb8cc430b (patch) | |
tree | 28e5ba58790e6964f5228382edacc6d459976926 /src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java | |
parent | 056fe03a21f48d57f31a0c11f159874b410bc4e9 (diff) | |
download | SpyGlass-3e4a74de169104679f2459947c27d5cfb8cc430b.tar.gz SpyGlass-3e4a74de169104679f2459947c27d5cfb8cc430b.zip |
Added JDBCTap support.
Diffstat (limited to 'src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java | 670 |
1 files changed, 670 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java new file mode 100644 index 0000000..5007895 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java @@ -0,0 +1,670 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import parallelai.spyglass.jdbc.db.DBInputFormat; +import parallelai.spyglass.jdbc.db.DBOutputFormat; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database. + * <p/> + * If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns + * for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated. + * <p/> + * Some constructors take columnFields and updateByFields. These values will be used during field name resolution + * to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names + * in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects, + * use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size. + * <p/> + * Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database. + */ +public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> +{ + private Class<? extends DBInputFormat> inputFormatClass; + private Class<? extends DBOutputFormat> outputFormatClass; + private String[] columns; + private String[] orderBy; + private String conditions; + private String[] updateBy; + private Fields updateValueFields; + private Fields updateByFields; + private Fields columnFields; + private Tuple updateIfTuple; + private String selectQuery; + private String countQuery; + private long limit = -1; + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy ) + { + this.columnFields = columnFields; + + verifyColumns( columnFields, columns ); + + setSinkFields( columnFields ); + setSourceFields( columnFields ); + + if( updateBy != null && updateBy.length != 0 ) + { + this.updateBy = updateBy; + this.updateByFields = updateByFields; + + if( updateByFields.size() != updateBy.length ) + throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" ); + + if( !this.columnFields.contains( this.updateByFields ) ) + throw new IllegalArgumentException( "columnFields must contain updateByFields column names" ); + + this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields ); + this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls + } + + this.columns = columns; + this.orderBy = orderBy; + this.conditions = conditions; + this.limit = limit; + + this.inputFormatClass = inputFormatClass; + this.outputFormatClass = outputFormatClass; + } + + private void verifyColumns( Fields columnFields, String[] columns ) + { + if( columnFields.size() != columns.length ) + throw new IllegalArgumentException( "columnFields and columns must be the same size" ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param outputFormatClass of type Class<? extends DBOutputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) + { + this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateBy of type String[] + */ + public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy ) + { + this( null, null, columns, orderBy, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param updateByFields of type Fields + * @param updateBy of type String[] + */ + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) + { + this( null, null, columnFields, columns, orderBy, updateByFields, updateBy ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit ) + { + this( null, null, columns, orderBy, conditions, limit, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit ) + { + this( null, null, columnFields, columns, orderBy, conditions, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param conditions of type String + */ + public JDBCScheme( String[] columns, String[] orderBy, String conditions ) + { + this( null, null, columns, orderBy, conditions, null ); + } + + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions ) + { + this( null, null, columnFields, columns, orderBy, conditions, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + * @param limit of type long + */ + public JDBCScheme( String[] columns, String[] orderBy, long limit ) + { + this( null, null, columns, orderBy, null, limit, null ); + } + + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit ) + { + this( null, null, columnFields, columns, orderBy, null, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param orderBy of type String[] + */ + public JDBCScheme( String[] columns, String[] orderBy ) + { + this( null, null, columns, orderBy, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param orderBy of type String[] + */ + public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy ) + { + this( null, null, columnFields, columns, orderBy, null, -1, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( String[] columns, String conditions, long limit ) + { + this( null, null, columns, null, conditions, limit, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param conditions of type String + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit ) + { + this( null, null, columnFields, columns, null, conditions, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param conditions of type String + */ + public JDBCScheme( String[] columns, String conditions ) + { + this( null, null, columns, null, conditions, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param conditions of type String + */ + public JDBCScheme( Fields columnFields, String[] columns, String conditions ) + { + this( null, null, columnFields, columns, null, conditions, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + * @param limit of type long + */ + public JDBCScheme( String[] columns, long limit ) + { + this( null, null, columns, null, null, limit, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, long limit ) + { + this( null, null, columnFields, columns, null, null, limit, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columns of type String[] + */ + public JDBCScheme( String[] columns ) + { + this( null, null, new Fields( columns ), columns, null, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + */ + public JDBCScheme( Fields columnFields, String[] columns ) + { + this( null, null, columnFields, columns, null, null, null ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * <p/> + * Use this constructor if the data source may only be used as a source. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit ) + { + this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param inputFormatClass of type Class<? extends DBInputFormat> + * @param columnFields of type Fields + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) + { + this.columnFields = columnFields; + + verifyColumns( columnFields, columns ); + + setSourceFields( columnFields ); + + this.columns = columns; + this.selectQuery = selectQuery.trim().replaceAll( ";$", "" ); + this.countQuery = countQuery.trim().replaceAll( ";$", "" ); + this.limit = limit; + + this.inputFormatClass = inputFormatClass; + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * <p/> + * Use this constructor if the data source may only be used as a source. + * + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit ) + { + this( null, new Fields( columns ), columns, selectQuery, countQuery, limit ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + * @param limit of type long + */ + public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) + { + this( null, columnFields, columns, selectQuery, countQuery, limit ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * <p/> + * Use this constructor if the data source may only be used as a source. + * + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + */ + public JDBCScheme( String[] columns, String selectQuery, String countQuery ) + { + this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 ); + } + + /** + * Constructor JDBCScheme creates a new JDBCScheme instance. + * + * @param columnFields of type Fields + * @param columns of type String[] + * @param selectQuery of type String + * @param countQuery of type String + */ + public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery ) + { + this( null, columnFields, columns, selectQuery, countQuery, -1 ); + } + + /** + * Method getColumns returns the columns of this JDBCScheme object. + * + * @return the columns (type String[]) of this JDBCScheme object. + */ + public String[] getColumns() { + return columns; + } + + /** + * Method getOrderBy returns the orderBy of this JDBCScheme object. + * + * @return the orderBy (type String[]) of this JDBCScheme object. + */ + public String[] getOrderBy() { + return orderBy; + } + + @Override + public void sourceConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, + JobConf conf ) { + int concurrentReads = ( (JDBCTap) tap ).concurrentReads; + + if( selectQuery != null ) + DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads ); + else { + String tableName = ( (JDBCTap) tap ).getTableName(); + String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null; + DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns ); + } + + if( inputFormatClass != null ) + conf.setInputFormat( inputFormatClass ); + } + + @Override + public void sinkConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, + JobConf conf ) { + if( selectQuery != null ) + throw new TapException( "cannot sink to this Scheme" ); + + String tableName = ( (JDBCTap) tap ).getTableName(); + int batchSize = ( (JDBCTap) tap ).getBatchSize(); + DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize ); + + if( outputFormatClass != null ) + conf.setOutputFormat( outputFormatClass ); + } + + @Override + public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) + { + Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + + sourceCall.setContext( pair ); + } + + @Override + public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException + { + Object key = sourceCall.getContext()[ 0 ]; + Object value = sourceCall.getContext()[ 1 ]; + boolean result = sourceCall.getInput().next( key, value ); + + if( !result ) + return false; + + Tuple newTuple = ( (TupleRecord) value ).getTuple(); + sourceCall.getIncomingEntry().setTuple( newTuple ); + + return true; + } + + @Override + public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) { + sourceCall.setContext( null ); + } + + @Override + public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException { + // it's ok to use NULL here so the collector does not write anything + TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); + OutputCollector outputCollector = sinkCall.getOutput(); + if( updateBy != null ) + { + Tuple allValues = tupleEntry.selectTuple( updateValueFields ); + Tuple updateValues = tupleEntry.selectTuple( updateByFields ); + + allValues = cleanTuple( allValues ); + + TupleRecord key = new TupleRecord( allValues ); + + if( updateValues.equals( updateIfTuple ) ) + outputCollector.collect( key, null ); + else + outputCollector.collect( key, key ); + + return; + } + + Tuple result = tupleEntry.selectTuple( getSinkFields() ); + + result = cleanTuple( result ); + + outputCollector.collect( new TupleRecord( result ), null ); + } + + /** + * Provides a hook for subclasses to escape or modify any values before creating the final SQL statement. + * + * @param result + * @return + */ + protected Tuple cleanTuple( Tuple result ) { + return result; + } + + @Override + public boolean equals( Object object ) { + if( this == object ) + return true; + if( !( object instanceof JDBCScheme ) ) + return false; + if( !super.equals( object ) ) + return false; + + JDBCScheme that = (JDBCScheme) object; + + if( limit != that.limit ) + return false; + if( columnFields != null ? !columnFields.equals( that.columnFields ) : that.columnFields != null ) + return false; + if( !Arrays.equals( columns, that.columns ) ) + return false; + if( conditions != null ? !conditions.equals( that.conditions ) : that.conditions != null ) + return false; + if( countQuery != null ? !countQuery.equals( that.countQuery ) : that.countQuery != null ) + return false; + if( inputFormatClass != null ? !inputFormatClass.equals( that.inputFormatClass ) : that.inputFormatClass != null ) + return false; + if( !Arrays.equals( orderBy, that.orderBy ) ) + return false; + if( outputFormatClass != null ? !outputFormatClass.equals( that.outputFormatClass ) : that.outputFormatClass != null ) + return false; + if( selectQuery != null ? !selectQuery.equals( that.selectQuery ) : that.selectQuery != null ) + return false; + if( !Arrays.equals( updateBy, that.updateBy ) ) + return false; + if( updateByFields != null ? !updateByFields.equals( that.updateByFields ) : that.updateByFields != null ) + return false; + if( updateIfTuple != null ? !updateIfTuple.equals( that.updateIfTuple ) : that.updateIfTuple != null ) + return false; + if( updateValueFields != null ? !updateValueFields.equals( that.updateValueFields ) : that.updateValueFields != null ) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + ( inputFormatClass != null ? inputFormatClass.hashCode() : 0 ); + result = 31 * result + ( outputFormatClass != null ? outputFormatClass.hashCode() : 0 ); + result = 31 * result + ( columns != null ? Arrays.hashCode( columns ) : 0 ); + result = 31 * result + ( orderBy != null ? Arrays.hashCode( orderBy ) : 0 ); + result = 31 * result + ( conditions != null ? conditions.hashCode() : 0 ); + result = 31 * result + ( updateBy != null ? Arrays.hashCode( updateBy ) : 0 ); + result = 31 * result + ( updateValueFields != null ? updateValueFields.hashCode() : 0 ); + result = 31 * result + ( updateByFields != null ? updateByFields.hashCode() : 0 ); + result = 31 * result + ( columnFields != null ? columnFields.hashCode() : 0 ); + result = 31 * result + ( updateIfTuple != null ? updateIfTuple.hashCode() : 0 ); + result = 31 * result + ( selectQuery != null ? selectQuery.hashCode() : 0 ); + result = 31 * result + ( countQuery != null ? countQuery.hashCode() : 0 ); + result = 31 * result + (int) ( limit ^ ( limit >>> 32 ) ); + return result; + } +} |