diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass')
10 files changed, 2880 insertions, 0 deletions
| diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java new file mode 100644 index 0000000..6788624 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java @@ -0,0 +1,25 @@ +package parallelai.spyglass.jdbc; + +import org.apache.hadoop.conf.Configuration; + +public class JDBCConstants { +   +  public enum JdbcSourceMode { +    SELECT, +    SELECT_WITH_PARTITIONS, +    SELECT_WITH_BUCKETS; +  } + +  public enum JdbcSinkMode { +	INSERT, +	UPDATE, +	UPSERT; +  } +   +  public static final String START_KEY = "jdbc.%s.startkey"; +  public static final String STOP_KEY = "jdbc.%s.stopkey";  +  public static final String SOURCE_MODE = "jdbc.%s.source.mode"; +  public static final String KEY_LIST = "jdbc.%s.key.list"; +  public static final String VERSIONS = "jdbc.%s.versions"; + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java new file mode 100644 index 0000000..5007895 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java @@ -0,0 +1,670 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import parallelai.spyglass.jdbc.db.DBInputFormat; +import parallelai.spyglass.jdbc.db.DBOutputFormat; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database. + * <p/> + * If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns + * for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated. + * <p/> + * Some constructors take columnFields and updateByFields. These values will be used during field name resolution + * to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names + * in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects, + * use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size. + * <p/> + * Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database. + */ +public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> +{ +    private Class<? extends DBInputFormat> inputFormatClass; +    private Class<? extends DBOutputFormat> outputFormatClass; +    private String[] columns; +    private String[] orderBy; +    private String conditions; +    private String[] updateBy; +    private Fields updateValueFields; +    private Fields updateByFields; +    private Fields columnFields; +    private Tuple updateIfTuple; +    private String selectQuery; +    private String countQuery; +    private long limit = -1; + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param limit             of type long +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columnFields      of type Fields +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param limit             of type long +     * @param updateByFields    of type Fields +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy ) +    { +        this.columnFields = columnFields; + +        verifyColumns( columnFields, columns ); + +        setSinkFields( columnFields ); +        setSourceFields( columnFields ); + +        if( updateBy != null && updateBy.length != 0 ) +        { +            this.updateBy = updateBy; +            this.updateByFields = updateByFields; + +            if( updateByFields.size() != updateBy.length ) +                throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" ); + +            if( !this.columnFields.contains( this.updateByFields ) ) +                throw new IllegalArgumentException( "columnFields must contain updateByFields column names" ); + +            this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields ); +            this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls +        } + +        this.columns = columns; +        this.orderBy = orderBy; +        this.conditions = conditions; +        this.limit = limit; + +        this.inputFormatClass = inputFormatClass; +        this.outputFormatClass = outputFormatClass; +    } + +    private void verifyColumns( Fields columnFields, String[] columns ) +    { +        if( columnFields.size() != columns.length ) +            throw new IllegalArgumentException( "columnFields and columns must be the same size" ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columnFields      of type Fields +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param updateByFields    of type Fields +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columnFields      of type Fields +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param updateByFields    of type Fields +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns  of type String[] +     * @param orderBy  of type String[] +     * @param updateBy of type String[] +     */ +    public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy ) +    { +        this( null, null, columns, orderBy, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields   of type Fields +     * @param columns        of type String[] +     * @param orderBy        of type String[] +     * @param updateByFields of type Fields +     * @param updateBy       of type String[] +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) +    { +        this( null, null, columnFields, columns, orderBy, updateByFields, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param orderBy    of type String[] +     * @param conditions of type String +     * @param limit      of type long +     */ +    public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit ) +    { +        this( null, null, columns, orderBy, conditions, limit, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param orderBy      of type String[] +     * @param conditions   of type String +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit ) +    { +        this( null, null, columnFields, columns, orderBy, conditions, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param orderBy    of type String[] +     * @param conditions of type String +     */ +    public JDBCScheme( String[] columns, String[] orderBy, String conditions ) +    { +        this( null, null, columns, orderBy, conditions, null ); +    } + +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions ) +    { +        this( null, null, columnFields, columns, orderBy, conditions, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     * @param orderBy of type String[] +     * @param limit   of type long +     */ +    public JDBCScheme( String[] columns, String[] orderBy, long limit ) +    { +        this( null, null, columns, orderBy, null, limit, null ); +    } + +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit ) +    { +        this( null, null, columnFields, columns, orderBy, null, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     * @param orderBy of type String[] +     */ +    public JDBCScheme( String[] columns, String[] orderBy ) +    { +        this( null, null, columns, orderBy, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param orderBy      of type String[] +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy ) +    { +        this( null, null, columnFields, columns, orderBy, null, -1, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param conditions of type String +     * @param limit      of type long +     */ +    public JDBCScheme( String[] columns, String conditions, long limit ) +    { +        this( null, null, columns, null, conditions, limit, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param conditions   of type String +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit ) +    { +        this( null, null, columnFields, columns, null, conditions, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param conditions of type String +     */ +    public JDBCScheme( String[] columns, String conditions ) +    { +        this( null, null, columns, null, conditions, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param conditions   of type String +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String conditions ) +    { +        this( null, null, columnFields, columns, null, conditions, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     * @param limit   of type long +     */ +    public JDBCScheme( String[] columns, long limit ) +    { +        this( null, null, columns, null, null, limit, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, long limit ) +    { +        this( null, null, columnFields, columns, null, null, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     */ +    public JDBCScheme( String[] columns ) +    { +        this( null, null, new Fields( columns ), columns, null, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     */ +    public JDBCScheme( Fields columnFields, String[] columns ) +    { +        this( null, null, columnFields, columns, null, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * <p/> +     * Use this constructor if the data source may only be used as a source. +     * +     * @param inputFormatClass of type Class<? extends DBInputFormat> +     * @param columns          of type String[] +     * @param selectQuery      of type String +     * @param countQuery       of type String +     * @param limit            of type long +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass of type Class<? extends DBInputFormat> +     * @param columnFields     of type Fields +     * @param columns          of type String[] +     * @param selectQuery      of type String +     * @param countQuery       of type String +     * @param limit            of type long +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this.columnFields = columnFields; + +        verifyColumns( columnFields, columns ); + +        setSourceFields( columnFields ); + +        this.columns = columns; +        this.selectQuery = selectQuery.trim().replaceAll( ";$", "" ); +        this.countQuery = countQuery.trim().replaceAll( ";$", "" ); +        this.limit = limit; + +        this.inputFormatClass = inputFormatClass; +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * <p/> +     * Use this constructor if the data source may only be used as a source. +     * +     * @param columns     of type String[] +     * @param selectQuery of type String +     * @param countQuery  of type String +     * @param limit       of type long +     */ +    public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this( null, new Fields( columns ), columns, selectQuery, countQuery, limit ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param selectQuery  of type String +     * @param countQuery   of type String +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this( null, columnFields, columns, selectQuery, countQuery, limit ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * <p/> +     * Use this constructor if the data source may only be used as a source. +     * +     * @param columns     of type String[] +     * @param selectQuery of type String +     * @param countQuery  of type String +     */ +    public JDBCScheme( String[] columns, String selectQuery, String countQuery ) +    { +        this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param selectQuery  of type String +     * @param countQuery   of type String +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery ) +    { +        this( null, columnFields, columns, selectQuery, countQuery, -1 ); +    } + +    /** +     * Method getColumns returns the columns of this JDBCScheme object. +     * +     * @return the columns (type String[]) of this JDBCScheme object. +     */ +    public String[] getColumns() { +        return columns; +    } + +    /** +     * Method getOrderBy returns the orderBy of this JDBCScheme object. +     * +     * @return the orderBy (type String[]) of this JDBCScheme object. +     */ +    public String[] getOrderBy() { +        return orderBy; +    } + +    @Override +    public void sourceConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +        JobConf conf ) { +        int concurrentReads = ( (JDBCTap) tap ).concurrentReads; + +        if( selectQuery != null ) +            DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads ); +        else { +            String tableName = ( (JDBCTap) tap ).getTableName(); +            String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null; +            DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns ); +        } + +        if( inputFormatClass != null ) +            conf.setInputFormat( inputFormatClass ); +    } + +    @Override +    public void sinkConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +        JobConf conf ) { +        if( selectQuery != null ) +            throw new TapException( "cannot sink to this Scheme" ); + +        String tableName = ( (JDBCTap) tap ).getTableName(); +        int batchSize = ( (JDBCTap) tap ).getBatchSize(); +        DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize ); + +        if( outputFormatClass != null ) +            conf.setOutputFormat( outputFormatClass ); +    } + +    @Override +    public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) +    { +        Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + +        sourceCall.setContext( pair ); +    } + +    @Override +    public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException +    { +        Object key = sourceCall.getContext()[ 0 ]; +        Object value = sourceCall.getContext()[ 1 ]; +        boolean result = sourceCall.getInput().next( key, value ); + +        if( !result ) +            return false; + +        Tuple newTuple = ( (TupleRecord) value ).getTuple(); +        sourceCall.getIncomingEntry().setTuple( newTuple ); + +        return true; +    } + +    @Override +    public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) { +        sourceCall.setContext( null ); +    } + +    @Override +    public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException { +        // it's ok to use NULL here so the collector does not write anything +        TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +        OutputCollector outputCollector = sinkCall.getOutput(); +        if( updateBy != null ) +        { +            Tuple allValues = tupleEntry.selectTuple( updateValueFields ); +            Tuple updateValues = tupleEntry.selectTuple( updateByFields ); + +            allValues = cleanTuple( allValues ); + +            TupleRecord key = new TupleRecord( allValues ); + +            if( updateValues.equals( updateIfTuple ) ) +                outputCollector.collect( key, null ); +            else +                outputCollector.collect( key, key ); + +            return; +        } + +        Tuple result = tupleEntry.selectTuple( getSinkFields() ); + +        result = cleanTuple( result ); + +        outputCollector.collect( new TupleRecord( result ), null ); +    } + +    /** +     * Provides a hook for subclasses to escape or modify any values before creating the final SQL statement. +     * +     * @param result +     * @return +     */ +    protected Tuple cleanTuple( Tuple result ) { +        return result; +    } + +    @Override +    public boolean equals( Object object ) { +        if( this == object ) +            return true; +        if( !( object instanceof JDBCScheme ) ) +            return false; +        if( !super.equals( object ) ) +            return false; + +        JDBCScheme that = (JDBCScheme) object; + +        if( limit != that.limit ) +            return false; +        if( columnFields != null ? !columnFields.equals( that.columnFields ) : that.columnFields != null ) +            return false; +        if( !Arrays.equals( columns, that.columns ) ) +            return false; +        if( conditions != null ? !conditions.equals( that.conditions ) : that.conditions != null ) +            return false; +        if( countQuery != null ? !countQuery.equals( that.countQuery ) : that.countQuery != null ) +            return false; +        if( inputFormatClass != null ? !inputFormatClass.equals( that.inputFormatClass ) : that.inputFormatClass != null ) +            return false; +        if( !Arrays.equals( orderBy, that.orderBy ) ) +            return false; +        if( outputFormatClass != null ? !outputFormatClass.equals( that.outputFormatClass ) : that.outputFormatClass != null ) +            return false; +        if( selectQuery != null ? !selectQuery.equals( that.selectQuery ) : that.selectQuery != null ) +            return false; +        if( !Arrays.equals( updateBy, that.updateBy ) ) +            return false; +        if( updateByFields != null ? !updateByFields.equals( that.updateByFields ) : that.updateByFields != null ) +            return false; +        if( updateIfTuple != null ? !updateIfTuple.equals( that.updateIfTuple ) : that.updateIfTuple != null ) +            return false; +        if( updateValueFields != null ? !updateValueFields.equals( that.updateValueFields ) : that.updateValueFields != null ) +            return false; + +        return true; +    } + +    @Override +    public int hashCode() { +        int result = super.hashCode(); +        result = 31 * result + ( inputFormatClass != null ? inputFormatClass.hashCode() : 0 ); +        result = 31 * result + ( outputFormatClass != null ? outputFormatClass.hashCode() : 0 ); +        result = 31 * result + ( columns != null ? Arrays.hashCode( columns ) : 0 ); +        result = 31 * result + ( orderBy != null ? Arrays.hashCode( orderBy ) : 0 ); +        result = 31 * result + ( conditions != null ? conditions.hashCode() : 0 ); +        result = 31 * result + ( updateBy != null ? Arrays.hashCode( updateBy ) : 0 ); +        result = 31 * result + ( updateValueFields != null ? updateValueFields.hashCode() : 0 ); +        result = 31 * result + ( updateByFields != null ? updateByFields.hashCode() : 0 ); +        result = 31 * result + ( columnFields != null ? columnFields.hashCode() : 0 ); +        result = 31 * result + ( updateIfTuple != null ? updateIfTuple.hashCode() : 0 ); +        result = 31 * result + ( selectQuery != null ? selectQuery.hashCode() : 0 ); +        result = 31 * result + ( countQuery != null ? countQuery.hashCode() : 0 ); +        result = 31 * result + (int) ( limit ^ ( limit >>> 32 ) ); +        return result; +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java new file mode 100644 index 0000000..1c10a05 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java @@ -0,0 +1,621 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.util.HadoopUtil; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parallelai.spyglass.jdbc.db.DBConfiguration; + +import java.io.IOException; +import java.sql.*; +import java.util.*; + +/** + * Class JDBCTap is a {@link Tap} sub-class that provides read and write access to a RDBMS via JDBC drivers. + * <p/> + * This Tap fully supports TABLE DROP and CREATE when given a {@link TableDesc} instance. + * <p/> + * When using {@link SinkMode#UPDATE}, Cascading is instructed to not delete the resource (drop the Table) + * and assumes its safe to begin sinking data into it. The {@link JDBCScheme} is responsible for + * deciding if/when to perform an UPDATE instead of an INSERT. + * <p/> + * Both INSERT and UPDATE are supported through the JDBCScheme. + * <p/> + * By sub-classing JDBCScheme, {@link com.twitter.maple.jdbc.db.DBInputFormat}, and {@link com.twitter.maple.jdbc.db.DBOutputFormat}, + * specific vendor features can be supported. + * <p/> + * Use {@link #setBatchSize(int)} to set the number of INSERT/UPDATES should be grouped together before being + * executed. The default vaue is 1,000. + * <p/> + * Use {@link #executeQuery(String, int)} or {@link #executeUpdate(String)} to invoke SQL statements against + * the underlying Table. + * <p/> + * Note that all classes under the {@link com.twitter.maple.jdbc.db} package originated from the Hadoop project and + * retain their Apache 2.0 license though they have been heavily modified to support INSERT/UPDATE and + * vendor specialization, and a number of other features like 'limit'. + * + * @see JDBCScheme + * @see com.twitter.maple.jdbc.db.DBInputFormat + * @see com.twitter.maple.jdbc.db.DBOutputFormat + */ +public class JDBCTap extends Tap<JobConf, RecordReader, OutputCollector> { +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger(JDBCTap.class); + +    private final String id = UUID.randomUUID().toString(); + +    /** Field connectionUrl */ +    String connectionUrl; +    /** Field username */ +    String username; +    /** Field password */ +    String password; +    /** Field driverClassName */ +    String driverClassName; +    /** Field tableDesc */ +    TableDesc tableDesc; +    /** Field batchSize */ +    int batchSize = 1000; +    /** Field concurrentReads */ +    int concurrentReads = 0; + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * <p/> +     * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated +     * into. By default it uses {@link SinkMode#UPDATE}. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param tableName       of type String +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, String tableName, JDBCScheme scheme ) { +        this( connectionUrl, username, password, driverClassName, new TableDesc( tableName ), scheme, SinkMode.UPDATE ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * +     * @param connectionUrl   of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     * @param sinkMode        of type SinkMode +     */ +    public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) { +        this( connectionUrl, null, null, driverClassName, tableDesc, scheme, sinkMode ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * <p/> +     * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated +     * into. By default it uses {@link SinkMode#UPDATE}. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) { +        this( connectionUrl, username, password, driverClassName, tableDesc, scheme, SinkMode.UPDATE ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     * @param sinkMode        of type SinkMode +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) { +        super( scheme, sinkMode ); +        this.connectionUrl = connectionUrl; +        this.username = username; +        this.password = password; +        this.driverClassName = driverClassName; +        this.tableDesc = tableDesc; + +        if( tableDesc.getColumnDefs() == null && sinkMode != SinkMode.UPDATE ) +            throw new IllegalArgumentException( "cannot have sink mode REPLACE or KEEP without TableDesc column defs, use UPDATE mode" ); + +        if( sinkMode != SinkMode.UPDATE ) +            LOG.warn( "using sink mode: {}, consider UPDATE to prevent DROP TABLE from being called during Flow or Cascade setup", sinkMode ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * <p/> +     * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated +     * into. By default it uses {@link SinkMode#UPDATE}. +     * +     * @param connectionUrl   of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) { +        this( connectionUrl, driverClassName, tableDesc, scheme, SinkMode.UPDATE ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance that may only used as a data source. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, JDBCScheme scheme ) { +        super( scheme ); +        this.connectionUrl = connectionUrl; +        this.username = username; +        this.password = password; +        this.driverClassName = driverClassName; +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * +     * @param connectionUrl   of type String +     * @param driverClassName of type String +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String driverClassName, JDBCScheme scheme ) { +        this( connectionUrl, null, null, driverClassName, scheme ); +    } + +    /** +     * Method getTableName returns the tableName of this JDBCTap object. +     * +     * @return the tableName (type String) of this JDBCTap object. +     */ +    public String getTableName() { +        return tableDesc.tableName; +    } + +    /** +     * Method setBatchSize sets the batchSize of this JDBCTap object. +     * +     * @param batchSize the batchSize of this JDBCTap object. +     */ +    public void setBatchSize( int batchSize ) { +        this.batchSize = batchSize; +    } + +    /** +     * Method getBatchSize returns the batchSize of this JDBCTap object. +     * +     * @return the batchSize (type int) of this JDBCTap object. +     */ +    public int getBatchSize() { +        return batchSize; +    } + +    /** +     * Method getConcurrentReads returns the concurrentReads of this JDBCTap object. +     * <p/> +     * This value specifies the number of concurrent selects and thus the number of mappers +     * that may be used. A value of -1 uses the job default. +     * +     * @return the concurrentReads (type int) of this JDBCTap object. +     */ +    public int getConcurrentReads() { +        return concurrentReads; +    } + +    /** +     * Method setConcurrentReads sets the concurrentReads of this JDBCTap object. +     * <p/> +     * This value specifies the number of concurrent selects and thus the number of mappers +     * that may be used. A value of -1 uses the job default. +     * +     * @param concurrentReads the concurrentReads of this JDBCTap object. +     */ +    public void setConcurrentReads( int concurrentReads ) { +        this.concurrentReads = concurrentReads; +    } + +    /** +     * Method getPath returns the path of this JDBCTap object. +     * +     * @return the path (type Path) of this JDBCTap object. +     */ +    public Path getPath() { +        return new Path( getJDBCPath() ); +    } + +    @Override +    public String getIdentifier() { +        return getJDBCPath() + this.id; +    } + + +    public String getJDBCPath() { +        return "jdbc:/" + connectionUrl.replaceAll( ":", "_" ); +    } + +    public boolean isWriteDirect() { +        return true; +    } + +    private JobConf getSourceConf( FlowProcess<JobConf> flowProcess, JobConf conf, String property ) +        throws IOException { +      //  Map<String, String> priorConf = HadoopUtil.deserializeBase64( property, conf, true ); +      //  return flowProcess.mergeMapIntoConfig( conf, priorConf ); +    	 +    	return null; +    } + +    @Override +    public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException { +        // input may be null when this method is called on the client side or cluster side when accumulating +        // for a HashJoin +        return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); +    } + +    @Override +    public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException { +        if( !isSink() ) +            throw new TapException( "this tap may not be used as a sink, no TableDesc defined" ); + +        LOG.info("Creating JDBCTapCollector output instance"); +        JDBCTapCollector jdbcCollector = new JDBCTapCollector( flowProcess, this ); + +        jdbcCollector.prepare(); + +        return jdbcCollector; +    } + +    @Override +    public boolean isSink() +    { +        return tableDesc != null; +    } + +    @Override +    public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf ) +    { +        // a hack for MultiInputFormat to see that there is a child format +        FileInputFormat.setInputPaths( conf, getPath() ); + +        if( username == null ) +            DBConfiguration.configureDB(conf, driverClassName, connectionUrl); +        else +            DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password ); + +        super.sourceConfInit( process, conf ); +    } + +    @Override +    public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf ) +    { +        if( !isSink() ) +            return; + +        // do not delete if initialized from within a task +        try { +            if( isReplace() && conf.get( "mapred.task.partition" ) == null && !deleteResource( conf ) ) +                throw new TapException( "unable to drop table: " + tableDesc.getTableName() ); + +            if( !createResource( conf ) ) +                throw new TapException( "unable to create table: " + tableDesc.getTableName() ); +        } catch(IOException e) { +            throw new TapException( "error while trying to modify table: " + tableDesc.getTableName() ); +        } + +        if( username == null ) +            DBConfiguration.configureDB( conf, driverClassName, connectionUrl ); +        else +            DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password ); + +        super.sinkConfInit( process, conf ); +    } + +    private Connection createConnection() +    { +        try +        { +            LOG.info( "creating connection: {}", connectionUrl ); + +            Class.forName( driverClassName ); + +            Connection connection = null; + +            if( username == null ) +                connection = DriverManager.getConnection( connectionUrl ); +            else +                connection = DriverManager.getConnection( connectionUrl, username, password ); + +            connection.setAutoCommit( false ); + +            return connection; +        } +        catch( ClassNotFoundException exception ) +        { +            throw new TapException( "unable to load driver class: " + driverClassName, exception ); +        } +        catch( SQLException exception ) +        { +            throw new TapException( "unable to open connection: " + connectionUrl, exception ); +        } +    } + +    /** +     * Method executeUpdate allows for ad-hoc update statements to be sent to the remote RDBMS. The number of +     * rows updated will be returned, if applicable. +     * +     * @param updateString of type String +     * @return int +     */ +    public int executeUpdate( String updateString ) +    { +        Connection connection = null; +        int result; + +        try +        { +            connection = createConnection(); + +            try +            { +                LOG.info( "executing update: {}", updateString ); + +                Statement statement = connection.createStatement(); + +                result = statement.executeUpdate( updateString ); + +                connection.commit(); +                statement.close(); +            } +            catch( SQLException exception ) +            { +                throw new TapException( "unable to execute update statement: " + updateString, exception ); +            } +        } +        finally +        { +            try +            { +                if( connection != null ) +                    connection.close(); +            } +            catch( SQLException exception ) +            { +                // ignore +                LOG.warn( "ignoring connection close exception", exception ); +            } +        } + +        return result; +    } + +    /** +     * Method executeQuery allows for ad-hoc queries to be sent to the remove RDBMS. A value +     * of -1 for returnResults will return a List of all results from the query, a value of 0 will return an empty List. +     * +     * @param queryString   of type String +     * @param returnResults of type int +     * @return List +     */ +    public List<Object[]> executeQuery( String queryString, int returnResults ) +    { +        Connection connection = null; +        List<Object[]> result = Collections.emptyList(); + +        try +        { +            connection = createConnection(); + +            try +            { +                LOG.info( "executing query: {}", queryString ); + +                Statement statement = connection.createStatement(); + +                ResultSet resultSet = statement.executeQuery( queryString ); // we don't care about results + +                if( returnResults != 0 ) +                    result = copyResultSet( resultSet, returnResults == -1 ? Integer.MAX_VALUE : returnResults ); + +                connection.commit(); +                statement.close(); +            } +            catch( SQLException exception ) +            { +                throw new TapException( "unable to execute query statement: " + queryString, exception ); +            } +        } +        finally +        { +            try +            { +                if( connection != null ) +                    connection.close(); +            } +            catch( SQLException exception ) +            { +                // ignore +                LOG.warn( "ignoring connection close exception", exception ); +            } +        } + +        return result; +    } + +    private List<Object[]> copyResultSet( ResultSet resultSet, int length ) throws SQLException +    { +        List<Object[]> results = new ArrayList<Object[]>( length ); +        int size = resultSet.getMetaData().getColumnCount(); + +        int count = 0; + +        while( resultSet.next() && count < length ) +        { +            count++; + +            Object[] row = new Object[size]; + +            for( int i = 0; i < row.length; i++ ) +                row[ i ] = resultSet.getObject( i + 1 ); + +            results.add( row ); +        } + +        return results; +    } + +    @Override +    public boolean createResource( JobConf conf ) throws IOException +    { +        if( resourceExists( conf ) ) +            return true; + +        try +        { +            LOG.info( "creating table: {}", tableDesc.tableName ); + +            executeUpdate( tableDesc.getCreateTableStatement() ); +        } +        catch( TapException exception ) +        { +            LOG.warn( "unable to create table: {}", tableDesc.tableName ); +            LOG.warn( "sql failure", exception.getCause() ); + +            return false; +        } + +        return resourceExists( conf ); +    } + +    @Override +    public boolean deleteResource( JobConf conf ) throws IOException +    { +        if( !isSink() ) +            return false; + +        if( !resourceExists( conf ) ) +            return true; + +        try +        { +            LOG.info( "deleting table: {}", tableDesc.tableName ); + +            executeUpdate( tableDesc.getTableDropStatement() ); +        } +        catch( TapException exception ) +        { +            LOG.warn( "unable to drop table: {}", tableDesc.tableName ); +            LOG.warn( "sql failure", exception.getCause() ); + +            return false; +        } + +        return !resourceExists( conf ); +    } + +    @Override +    public boolean resourceExists( JobConf conf ) throws IOException +    { +        if( !isSink() ) +            return true; + +        try +        { +            LOG.info( "test table exists: {}", tableDesc.tableName ); + +            executeQuery( tableDesc.getTableExistsQuery(), 0 ); +        } +        catch( TapException exception ) +        { +            return false; +        } + +        return true; +    } + +    @Override +    public long getModifiedTime( JobConf conf ) throws IOException +    { +        return System.currentTimeMillis(); +    } + +    @Override +    public String toString() +    { +        return "JDBCTap{" + "connectionUrl='" + connectionUrl + '\'' + ", driverClassName='" + driverClassName + '\'' + ", tableDesc=" + tableDesc + '}'; +    } + +    @Override +    public boolean equals( Object object ) +    { +        if( this == object ) +            return true; +        if( !( object instanceof JDBCTap ) ) +            return false; +        if( !super.equals( object ) ) +            return false; + +        JDBCTap jdbcTap = (JDBCTap) object; + +        if( connectionUrl != null ? !connectionUrl.equals( jdbcTap.connectionUrl ) : jdbcTap.connectionUrl != null ) +            return false; +        if( driverClassName != null ? !driverClassName.equals( jdbcTap.driverClassName ) : jdbcTap.driverClassName != null ) +            return false; +        if( password != null ? !password.equals( jdbcTap.password ) : jdbcTap.password != null ) +            return false; +        if( tableDesc != null ? !tableDesc.equals( jdbcTap.tableDesc ) : jdbcTap.tableDesc != null ) +            return false; +        if( username != null ? !username.equals( jdbcTap.username ) : jdbcTap.username != null ) +            return false; + +        return true; +    } + +    @Override +    public int hashCode() +    { +        int result = super.hashCode(); +        result = 31 * result + ( connectionUrl != null ? connectionUrl.hashCode() : 0 ); +        result = 31 * result + ( username != null ? username.hashCode() : 0 ); +        result = 31 * result + ( password != null ? password.hashCode() : 0 ); +        result = 31 * result + ( driverClassName != null ? driverClassName.hashCode() : 0 ); +        result = 31 * result + ( tableDesc != null ? tableDesc.hashCode() : 0 ); +        result = 31 * result + batchSize; +        return result; +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java new file mode 100644 index 0000000..e8b7deb --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class JDBCTapCollector is a kind of {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the resource managed by + * a particular {@link JDBCTap} instance. + */ +public class JDBCTapCollector extends TupleEntrySchemeCollector implements OutputCollector +{ +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger( JDBCTapCollector.class ); + +    /** Field conf */ +    private final JobConf conf; +    /** Field writer */ +    private RecordWriter writer; +    /** Field flowProcess */ +    private final FlowProcess<JobConf> hadoopFlowProcess; +    /** Field tap */ +    private final Tap<JobConf, RecordReader, OutputCollector> tap; +    /** Field reporter */ +    private final Reporter reporter = Reporter.NULL; + +    /** +     * Constructor TapCollector creates a new TapCollector instance. +     * +     * @param flowProcess +     * @param tap               of type Tap +     * @throws IOException when fails to initialize +     */ +    public JDBCTapCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException { +        super( flowProcess, tap.getScheme() ); +        this.hadoopFlowProcess = flowProcess; + +        this.tap = tap; +        this.conf = new JobConf( flowProcess.getConfigCopy() ); + +        this.setOutput( this ); +    } + +    @Override +    public void prepare() { +        try { +            initialize(); +        } catch (IOException e) { +            throw new RuntimeException(e); +        } + +        super.prepare(); +    } + +    private void initialize() throws IOException { +        tap.sinkConfInit( hadoopFlowProcess, conf ); + +        OutputFormat outputFormat = conf.getOutputFormat(); + +        LOG.info("Output format class is: " + outputFormat.getClass().toString()); + +        writer = outputFormat.getRecordWriter( null, conf, tap.getIdentifier(), Reporter.NULL ); + +        sinkCall.setOutput( this ); +    } + +    @Override +    public void close() { +        try { +            LOG.info( "closing tap collector for: {}", tap ); +            writer.close( reporter ); +        } catch( IOException exception ) { +            LOG.warn( "exception closing: {}", exception ); +            throw new TapException( "exception closing JDBCTapCollector", exception ); +        } finally { +            super.close(); +        } +    } + +    /** +     * Method collect writes the given values to the {@link Tap} this instance encapsulates. +     * +     * @param writableComparable of type WritableComparable +     * @param writable           of type Writable +     * @throws IOException when +     */ +    public void collect( Object writableComparable, Object writable ) throws IOException { +        if (hadoopFlowProcess instanceof HadoopFlowProcess) +            ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + +        writer.write( writableComparable, writable ); +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/TableDesc.java b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java new file mode 100644 index 0000000..b0aaea7 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.util.Util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Class TableDesc describes a SQL based table, this description is used by the {@link JDBCTap} when + * creating a missing table. + * + * @see JDBCTap + * @see JDBCScheme + */ +public class TableDesc implements Serializable { +    /** Field tableName */ +    String tableName; +    /** Field columnNames */ +    String[] columnNames; +    /** Field columnDefs */ +    String[] columnDefs; +    /** Field primaryKeys */ +    String[] primaryKeys; + +    /** +     * Constructor TableDesc creates a new TableDesc instance. +     * +     * @param tableName of type String +     */ +    public TableDesc( String tableName ) { +        this.tableName = tableName; +    } + +    /** +     * Constructor TableDesc creates a new TableDesc instance. +     * +     * @param tableName   of type String +     * @param columnNames of type String[] +     * @param columnDefs  of type String[] +     * @param primaryKeys of type String +     */ +    public TableDesc( String tableName, String[] columnNames, String[] columnDefs, String[] primaryKeys ) { +        this.tableName = tableName; +        this.columnNames = columnNames; +        this.columnDefs = columnDefs; +        this.primaryKeys = primaryKeys; +    } + +    public String getTableName() { +        return tableName; +    } + +    public String[] getColumnNames() { +        return columnNames; +    } + +    public String[] getColumnDefs() { +        return columnDefs; +    } + +    public String[] getPrimaryKeys() { +        return primaryKeys; +    } + +    /** +     * Method getTableCreateStatement returns the tableCreateStatement of this TableDesc object. +     * +     * @return the tableCreateStatement (type String) of this TableDesc object. +     */ +    public String getCreateTableStatement() { +        List<String> createTableStatement = new ArrayList<String>(); + +        createTableStatement = addCreateTableBodyTo( createTableStatement ); + +        return String.format( getCreateTableFormat(), tableName, Util.join( createTableStatement, ", " ) ); +    } + +    protected List<String> addCreateTableBodyTo( List<String> createTableStatement ) { +        createTableStatement = addDefinitionsTo( createTableStatement ); +        createTableStatement = addPrimaryKeyTo( createTableStatement ); + +        return createTableStatement; +    } + +    protected String getCreateTableFormat() { +        return "CREATE TABLE %s ( %s )"; +    } + +    protected List<String> addDefinitionsTo( List<String> createTableStatement ) { +        for( int i = 0; i < columnNames.length; i++ ) { +            String columnName = columnNames[ i ]; +            String columnDef = columnDefs[ i ]; + +            createTableStatement.add( columnName + " " + columnDef ); +        } + +        return createTableStatement; +    } + +    protected List<String> addPrimaryKeyTo( List<String> createTableStatement ) { +        if( hasPrimaryKey() ) +            createTableStatement.add( String.format( "PRIMARY KEY( %s )", Util.join( primaryKeys, ", " ) ) ); + +        return createTableStatement; +    } + +    /** +     * Method getTableDropStatement returns the tableDropStatement of this TableDesc object. +     * +     * @return the tableDropStatement (type String) of this TableDesc object. +     */ +    public String getTableDropStatement() { +        return String.format( getDropTableFormat(), tableName ); +    } + +    protected String getDropTableFormat() { +        return "DROP TABLE %s"; +    } + +    /** +     * Method getTableExistsQuery returns the tableExistsQuery of this TableDesc object. +     * +     * @return the tableExistsQuery (type String) of this TableDesc object. +     */ +    public String getTableExistsQuery() { +        return String.format( "select 1 from %s where 1 = 0", tableName ); +    } + +    private boolean hasPrimaryKey() { +        return primaryKeys != null && primaryKeys.length != 0; +    } + +    @Override +    public String toString() { +        return "TableDesc{" + "tableName='" + tableName + '\'' + ", columnNames=" + ( columnNames == null ? null : Arrays.asList( columnNames ) ) + ", columnDefs=" + ( columnDefs == null ? null : Arrays.asList( columnDefs ) ) + ", primaryKeys=" + ( primaryKeys == null ? null : Arrays.asList( primaryKeys ) ) + '}'; +    } + +    @Override +    public boolean equals( Object object ) { +        if( this == object ) +            return true; +        if( !( object instanceof TableDesc ) ) +            return false; + +        TableDesc tableDesc = (TableDesc) object; + +        if( !Arrays.equals( columnDefs, tableDesc.columnDefs ) ) +            return false; +        if( !Arrays.equals( columnNames, tableDesc.columnNames ) ) +            return false; +        if( !Arrays.equals( primaryKeys, tableDesc.primaryKeys ) ) +            return false; +        if( tableName != null ? !tableName.equals( tableDesc.tableName ) : tableDesc.tableName != null ) +            return false; + +        return true; +    } + +    @Override +    public int hashCode() { +        int result = tableName != null ? tableName.hashCode() : 0; +        result = 31 * result + ( columnNames != null ? Arrays.hashCode( columnNames ) : 0 ); +        result = 31 * result + ( columnDefs != null ? Arrays.hashCode( columnDefs ) : 0 ); +        result = 31 * result + ( primaryKeys != null ? Arrays.hashCode( primaryKeys ) : 0 ); +        return result; +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java new file mode 100644 index 0000000..4191e79 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.tuple.Tuple; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import parallelai.spyglass.jdbc.db.DBWritable; + +public class TupleRecord implements DBWritable { +    private Tuple tuple; + +    public TupleRecord() { +    } + +    public TupleRecord( Tuple tuple ) { +        this.tuple = tuple; +    } + +    public void setTuple( Tuple tuple ) { +        this.tuple = tuple; +    } + +    public Tuple getTuple() { +        return tuple; +    } + +    public void write( PreparedStatement statement ) throws SQLException { +        for( int i = 0; i < tuple.size(); i++ ) { +        	//System.out.println("Insert Tuple => " + " statement.setObject( " + (i + 1) + "," + tuple.get( i )); +            statement.setObject( i + 1, tuple.get( i ) ); +        } +        boolean test = true; +        if (test) { +	        for( int i = 1; i < tuple.size(); i++ ) { +	        	//System.out.println("Update Tuple => " + " statement.setObject( " + (i + tuple.size()) + "," + tuple.get( i )); +	            statement.setObject( i + tuple.size(), tuple.get( i ) ); +	        } +        } +         +    } + +    public void readFields( ResultSet resultSet ) throws SQLException { +        tuple = new Tuple(); + +        for( int i = 0; i < resultSet.getMetaData().getColumnCount(); i++ ) +            tuple.add( (Comparable) resultSet.getObject( i + 1 ) ); +    } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java new file mode 100644 index 0000000..1bb0786 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * A container for configuration property names for jobs with DB input/output. <br> The job can be + * configured using the static methods in this class, {@link DBInputFormat}, and {@link + * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper + * values. + */ +public class DBConfiguration { + +    /** The JDBC Driver class name */ +    public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class"; + +    /** JDBC Database access URL */ +    public static final String URL_PROPERTY = "mapred.jdbc.url"; + +    /** User name to access the database */ +    public static final String USERNAME_PROPERTY = "mapred.jdbc.username"; + +    /** Password to access the database */ +    public static final String PASSWORD_PROPERTY = "mapred.jdbc.password"; + +    /** Input table name */ +    public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name"; + +    /** Field names in the Input table */ +    public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names"; + +    /** WHERE clause in the input SELECT statement */ +    public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions"; + +    /** ORDER BY clause in the input SELECT statement */ +    public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby"; + +    /** Whole input query, exluding LIMIT...OFFSET */ +    public static final String INPUT_QUERY = "mapred.jdbc.input.query"; + +    /** The number of records to LIMIT, useful for testing */ +    public static final String INPUT_LIMIT = "mapred.jdbc.input.limit"; + +    /** Input query to get the count of records */ +    public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query"; + +    /** Class name implementing DBWritable which will hold input tuples */ +    public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class"; + +    /** Output table name */ +    public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name"; + +    /** Field names in the Output table */ +    public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names"; + +    /** Field names in the Output table */ +    public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY = +        "mapred.jdbc.output.update.field.names"; + +    /** The number of statements to batch before executing */ +    public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num"; + +    /** The number of splits allowed, becomes max concurrent reads. */ +    public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num"; + +    /** +     * Sets the DB access related fields in the Configuration. +     * +     * @param job         the job +     * @param driverClass JDBC Driver class name +     * @param dbUrl       JDBC DB access URL. +     * @param userName    DB access username +     * @param passwd      DB access passwd +     */ +    public static void configureDB(Configuration job, String driverClass, String dbUrl, +        String userName, String passwd) { +        job.set(DRIVER_CLASS_PROPERTY, driverClass); +        job.set(URL_PROPERTY, dbUrl); + +        if (userName != null) { job.set(USERNAME_PROPERTY, userName); } + +        if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); } +    } + +    /** +     * Sets the DB access related fields in the Configuration. +     * +     * @param job         the job +     * @param driverClass JDBC Driver class name +     * @param dbUrl       JDBC DB access URL. +     */ +    public static void configureDB(Configuration job, String driverClass, String dbUrl) { +        configureDB(job, driverClass, dbUrl, null, null); +    } + +    private Configuration job; + +    DBConfiguration(Configuration job) { +        this.job = job; +    } + +    /** +     * Returns a connection object to the DB +     * +     * @throws ClassNotFoundException +     * @throws SQLException +     */ +    Connection getConnection() throws IOException { +        try { +            Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); +        } catch (ClassNotFoundException exception) { +            throw new IOException("unable to load conection driver", exception); +        } + +        try { +            if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) { +                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); +            } else { +                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job +                    .get(DBConfiguration.USERNAME_PROPERTY), job +                    .get(DBConfiguration.PASSWORD_PROPERTY)); +            } +        } catch (SQLException exception) { +            throw new IOException("unable to create connection", exception); +        } +    } + +    String getInputTableName() { +        return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); +    } + +    void setInputTableName(String tableName) { +        job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); +    } + +    String[] getInputFieldNames() { +        return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY); +    } + +    void setInputFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    String getInputConditions() { +        return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY); +    } + +    void setInputConditions(String conditions) { +        if (conditions != null && conditions.length() > 0) { +            job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions); +        } +    } + +    String getInputOrderBy() { +        return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY); +    } + +    void setInputOrderBy(String orderby) { +        if (orderby != null && orderby.length() > 0) { +            job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby); +        } +    } + +    String getInputQuery() { +        return job.get(DBConfiguration.INPUT_QUERY); +    } + +    void setInputQuery(String query) { +        if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); } +    } + +    long getInputLimit() { +        return job.getLong(DBConfiguration.INPUT_LIMIT, -1); +    } + +    void setInputLimit(long limit) { +        job.setLong(DBConfiguration.INPUT_LIMIT, limit); +    } + +    String getInputCountQuery() { +        return job.get(DBConfiguration.INPUT_COUNT_QUERY); +    } + +    void setInputCountQuery(String query) { +        if (query != null && query.length() > 0) { +            job.set(DBConfiguration.INPUT_COUNT_QUERY, query); +        } +    } + +    Class<?> getInputClass() { +        return job +            .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class); +    } + +    void setInputClass(Class<? extends DBWritable> inputClass) { +        job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class); +    } + +    String getOutputTableName() { +        return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY); +    } + +    void setOutputTableName(String tableName) { +        job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); +    } + +    String[] getOutputFieldNames() { +        return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY); +    } + +    void setOutputFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    String[] getOutputUpdateFieldNames() { +        return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY); +    } + +    void setOutputUpdateFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    int getBatchStatementsNum() { +        return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000); +    } + +    void setBatchStatementsNum(int batchStatementsNum) { +        job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum); +    } + +    int getMaxConcurrentReadsNum() { +        return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0); +    } + +    void setMaxConcurrentReadsNum(int maxConcurrentReads) { +        if (maxConcurrentReads < 0) { +            throw new IllegalArgumentException("maxConcurrentReads must be a positive value"); +        } + +        job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads); +    } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java new file mode 100644 index 0000000..115d9bd --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java @@ -0,0 +1,452 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.*; + +/** + * A InputFormat that reads input data from an SQL table. <p/> DBInputFormat emits LongWritables + * containing the record number as key and DBWritables as value. <p/> The SQL query, and input class + * can be using one of the two setInput methods. + */ +public class DBInputFormat<T extends DBWritable> +    implements InputFormat<LongWritable, T>, JobConfigurable { +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger(DBInputFormat.class); + +    /** +     * A RecordReader that reads records from a SQL table. Emits LongWritables containing the record +     * number as key and DBWritables as value. +     */ +    protected class DBRecordReader implements RecordReader<LongWritable, T> { +        private ResultSet results; +        private Statement statement; +        private Class<T> inputClass; +        private JobConf job; +        private DBInputSplit split; +        private long pos = 0; + +        /** +         * @param split The InputSplit to read data for +         * @throws SQLException +         */ +        protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) +            throws SQLException, IOException { +            this.inputClass = inputClass; +            this.split = split; +            this.job = job; + +            statement = +                connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + +            //statement.setFetchSize(Integer.MIN_VALUE); +            String query = getSelectQuery(); +            try { +                LOG.info(query); +                results = statement.executeQuery(query); +                LOG.info("done executing select query"); +            } catch (SQLException exception) { +                LOG.error("unable to execute select query: " + query, exception); +                throw new IOException("unable to execute select query: " + query, exception); +            } +        } + +        /** +         * Returns the query for selecting the records, subclasses can override this for custom +         * behaviour. +         */ +        protected String getSelectQuery() { +            LOG.info("Executing select query"); +            StringBuilder query = new StringBuilder(); + +            if (dbConf.getInputQuery() == null) { +                query.append("SELECT "); + +                for (int i = 0; i < fieldNames.length; i++) { +                    query.append(fieldNames[i]); + +                    if (i != fieldNames.length - 1) +                        query.append(", "); +                } + +                query.append(" FROM ").append(tableName); +                query.append(" AS ").append(tableName); //in hsqldb this is necessary + +                if (conditions != null && conditions.length() > 0) +                    query.append(" WHERE (").append(conditions).append(")"); + +                String orderBy = dbConf.getInputOrderBy(); + +                if (orderBy != null && orderBy.length() > 0) +                    query.append(" ORDER BY ").append(orderBy); + +            } +            else +                query.append(dbConf.getInputQuery()); + +            try { +                // Only add limit and offset if you have multiple chunks +                if(split.getChunks() > 1) { +                    query.append(" LIMIT ").append(split.getLength()); +                    query.append(" OFFSET ").append(split.getStart()); +                } +            } catch (IOException ex) { +                //ignore, will not throw +            } + +            return query.toString(); +        } + +        /** {@inheritDoc} */ +        public void close() throws IOException { +            try { +                connection.commit(); +                results.close(); +                statement.close(); +            } catch (SQLException exception) { +                throw new IOException("unable to commit and close", exception); +            } +        } + +        /** {@inheritDoc} */ +        public LongWritable createKey() { +            return new LongWritable(); +        } + +        /** {@inheritDoc} */ +        public T createValue() { +            return ReflectionUtils.newInstance(inputClass, job); +        } + +        /** {@inheritDoc} */ +        public long getPos() throws IOException { +            return pos; +        } + +        /** {@inheritDoc} */ +        public float getProgress() throws IOException { +            return pos / (float) split.getLength(); +        } + +        /** {@inheritDoc} */ +        public boolean next(LongWritable key, T value) throws IOException { +            try { +                if (!results.next()) +                    return false; + +                // Set the key field value as the output key value +                key.set(pos + split.getStart()); + +                value.readFields(results); + +                pos++; +            } catch (SQLException exception) { +                throw new IOException("unable to get next value", exception); +            } + +            return true; +        } +    } + +    /** A Class that does nothing, implementing DBWritable */ +    public static class NullDBWritable implements DBWritable, Writable { + +        public void readFields(DataInput in) throws IOException { +        } + +        public void readFields(ResultSet arg0) throws SQLException { +        } + +        public void write(DataOutput out) throws IOException { +        } + +        public void write(PreparedStatement arg0) throws SQLException { +        } +    } + +    /** A InputSplit that spans a set of rows */ +    protected static class DBInputSplit implements InputSplit { +        private long end = 0; +        private long start = 0; +        private long chunks = 0; + +        /** Default Constructor */ +        public DBInputSplit() { +        } + +        /** +         * Convenience Constructor +         * +         * @param start the index of the first row to select +         * @param end   the index of the last row to select +         */ +        public DBInputSplit(long start, long end, long chunks) { +            this.start = start; +            this.end = end; +            this.chunks = chunks; +            LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks); +        } + +        /** {@inheritDoc} */ +        public String[] getLocations() throws IOException { +            // TODO Add a layer to enable SQL "sharding" and support locality +            return new String[]{}; +        } + +        /** @return The index of the first row to select */ +        public long getStart() { +            return start; +        } + +        /** @return The index of the last row to select */ +        public long getEnd() { +            return end; +        } + +        /** @return The total row count in this split */ +        public long getLength() throws IOException { +            return end - start; +        } + +        /** @return The total number of chucks accross all splits */ +        public long getChunks() { +            return chunks; +        } + +        /** {@inheritDoc} */ +        public void readFields(DataInput input) throws IOException { +            start = input.readLong(); +            end = input.readLong(); +            chunks = input.readLong(); +        } + +        /** {@inheritDoc} */ +        public void write(DataOutput output) throws IOException { +            output.writeLong(start); +            output.writeLong(end); +            output.writeLong(chunks); +        } +    } + +    protected DBConfiguration dbConf; +    protected Connection connection; + +    protected String tableName; +    protected String[] fieldNames; +    protected String conditions; +    protected long limit; +    protected int maxConcurrentReads; + + +    /** {@inheritDoc} */ +    public void configure(JobConf job) { +        dbConf = new DBConfiguration(job); + +        tableName = dbConf.getInputTableName(); +        fieldNames = dbConf.getInputFieldNames(); +        conditions = dbConf.getInputConditions(); +        limit = dbConf.getInputLimit(); +        maxConcurrentReads = dbConf.getMaxConcurrentReadsNum(); + +        try { +            connection = dbConf.getConnection(); +        } catch (IOException exception) { +            throw new RuntimeException("unable to create connection", exception.getCause()); +        } + +        configureConnection(connection); +    } + +    protected void configureConnection(Connection connection) { +        setTransactionIsolationLevel(connection); +        setAutoCommit(connection); +    } + +    protected void setAutoCommit(Connection connection) { +        try { +            connection.setAutoCommit(false); +        } catch (Exception exception) { +            throw new RuntimeException("unable to set auto commit", exception); +        } +    } + +    protected void setTransactionIsolationLevel(Connection connection) { +        try { +            connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); +        } catch (SQLException exception) { +            throw new RuntimeException("unable to configure transaction isolation level", exception); +        } +    } + +    /** {@inheritDoc} */ +    @SuppressWarnings("unchecked") +    public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job, +        Reporter reporter) throws IOException { +        Class inputClass = dbConf.getInputClass(); +        try { +            return new DBRecordReader((DBInputSplit) split, inputClass, job); +        } catch (SQLException exception) { +            throw new IOException(exception.getMessage(), exception); +        } +    } + +    /** {@inheritDoc} */ +    public InputSplit[] getSplits(JobConf job, int chunks) throws IOException { +        // use the configured value if avail +        chunks = maxConcurrentReads == 0 ? chunks : maxConcurrentReads; + +        try { +            Statement statement = connection.createStatement(); + +            ResultSet results = statement.executeQuery(getCountQuery()); + +            long count = 0; + +            while (results.next()) +                count += results.getLong(1); + +            if (limit != -1) +                count = Math.min(limit, count); + +            long chunkSize = (count / chunks); + +            results.close(); +            statement.close(); + +            InputSplit[] splits = new InputSplit[chunks]; + +            // Split the rows into n-number of chunks and adjust the last chunk +            // accordingly +            for (int i = 0; i < chunks; i++) { +                DBInputSplit split; + +                if (i + 1 == chunks) +                    split = new DBInputSplit(i * chunkSize, count, chunks); +                else +                    split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks); + +                splits[i] = split; +            } + +            return splits; +        } catch (SQLException e) { +            throw new IOException(e.getMessage()); +        } +    } + +    /** +     * Returns the query for getting the total number of rows, subclasses can override this for +     * custom behaviour. +     */ +    protected String getCountQuery() { + +        if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); } + +        StringBuilder query = new StringBuilder(); + +        query.append("SELECT COUNT(*) FROM " + tableName); + +        if (conditions != null && conditions.length() > 0) +            query.append(" WHERE " + conditions); + +        return query.toString(); +    } + +    /** +     * Initializes the map-part of the job with the appropriate input settings. +     * +     * @param job             The job +     * @param inputClass      the class object implementing DBWritable, which is the Java object +     *                        holding tuple fields. +     * @param tableName       The table to read data from +     * @param conditions      The condition which to select data with, eg. '(updated > 20070101 AND +     *                        length > 0)' +     * @param orderBy         the fieldNames in the orderBy clause. +     * @param limit +     * @param fieldNames      The field names in the table +     * @param concurrentReads +     */ +    public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, +        String tableName, String conditions, String orderBy, long limit, int concurrentReads, +        String... fieldNames) { +        job.setInputFormat(DBInputFormat.class); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setInputClass(inputClass); +        dbConf.setInputTableName(tableName); +        dbConf.setInputFieldNames(fieldNames); +        dbConf.setInputConditions(conditions); +        dbConf.setInputOrderBy(orderBy); + +        if (limit != -1) +            dbConf.setInputLimit(limit); + +        dbConf.setMaxConcurrentReadsNum(concurrentReads); +    } + +    /** +     * Initializes the map-part of the job with the appropriate input settings. +     * +     * @param job             The job +     * @param inputClass      the class object implementing DBWritable, which is the Java object +     *                        holding tuple fields. +     * @param selectQuery     the input query to select fields. Example : "SELECT f1, f2, f3 FROM +     *                        Mytable ORDER BY f1" +     * @param countQuery      the input query that returns the number of records in the table. +     *                        Example : "SELECT COUNT(f1) FROM Mytable" +     * @param concurrentReads +     */ +    public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, +        String selectQuery, String countQuery, long limit, int concurrentReads) { +        job.setInputFormat(DBInputFormat.class); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setInputClass(inputClass); +        dbConf.setInputQuery(selectQuery); +        dbConf.setInputCountQuery(countQuery); + +        if (limit != -1) +            dbConf.setInputLimit(limit); + +        dbConf.setMaxConcurrentReadsNum(concurrentReads); +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java new file mode 100644 index 0000000..1166970 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts + * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} + * writes <b>only the key</b> to the database with a batch SQL query. + */ +public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { +    private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + +    /** A RecordWriter that writes the reduce output to a SQL table */ +    protected class DBRecordWriter implements RecordWriter<K, V> { +        private Connection connection; +        private PreparedStatement insertStatement; +        private PreparedStatement updateStatement; +        private final int statementsBeforeExecute; + +        private long statementsAdded = 0; +        private long insertStatementsCurrent = 0; +        private long updateStatementsCurrent = 0; + +        protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, +            PreparedStatement updateStatement, int statementsBeforeExecute) { +            this.connection = connection; +            this.insertStatement = insertStatement; +            this.updateStatement = updateStatement; +            this.statementsBeforeExecute = statementsBeforeExecute; +        } + +        /** {@inheritDoc} */ +        public void close(Reporter reporter) throws IOException { +            executeBatch(); + +            try { +                if (insertStatement != null) { insertStatement.close(); } + +                if (updateStatement != null) { updateStatement.close(); } + +                connection.commit(); +            } catch (SQLException exception) { +                rollBack(); + +                createThrowMessage("unable to commit batch", 0, exception); +            } finally { +                try { +                    connection.close(); +                } catch (SQLException exception) { +                    throw new IOException("unable to close connection", exception); +                } +            } +        } + +        private void executeBatch() throws IOException { +            try { +                if (insertStatementsCurrent != 0) { +                    LOG.info( +                        "executing insert batch " + createBatchMessage(insertStatementsCurrent)); + +                    insertStatement.executeBatch(); +                } + +                insertStatementsCurrent = 0; +            } catch (SQLException exception) { +                rollBack(); + +                createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); +            } + +            try { +                if (updateStatementsCurrent != 0) { +                    LOG.info( +                        "executing update batch " + createBatchMessage(updateStatementsCurrent)); + +                    int[] result = updateStatement.executeBatch(); + +                    int count = 0; + +                    for (int value : result) { count += value; } + +                    if (count != updateStatementsCurrent) { +                        throw new IOException( +                            "update did not update same number of statements executed in batch, batch: " +                            + updateStatementsCurrent + " updated: " + count); +                    } +                } + +                updateStatementsCurrent = 0; +            } catch (SQLException exception) { +            	 +            	String message = exception.getMessage(); +            	if (message.indexOf("Duplicate Key") >= 0) { +            		LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); +            	} else { +                    rollBack(); + +                    createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); +            	} +            } +        } + +        private void rollBack() { +            try { +                connection.rollback(); +            } catch (SQLException sqlException) { +                LOG.warn(StringUtils.stringifyException(sqlException)); +            } +        } + +        private String createBatchMessage(long currentStatements) { +            return String +                .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); +        } + +        private void createThrowMessage(String stateMessage, long currentStatements, +            SQLException exception) throws IOException { +            String message = exception.getMessage(); + +            message = message.substring(0, Math.min(75, message.length())); + +            int messageLength = exception.getMessage().length(); +            String batchMessage = createBatchMessage(currentStatements); +            String template = "%s [msglength: %d]%s %s"; +            String errorMessage = +                String.format(template, stateMessage, messageLength, batchMessage, message); + +            LOG.error(errorMessage, exception.getNextException()); + +            throw new IOException(errorMessage, exception.getNextException()); +        } + +        /** {@inheritDoc} */ +        public synchronized void write(K key, V value) throws IOException { +            try { +                if (value == null) { +                    key.write(insertStatement); +                    insertStatement.addBatch(); +                    insertStatementsCurrent++; +                } else { +                    key.write(updateStatement); +                    updateStatement.addBatch(); +                    updateStatementsCurrent++; +                } +            } catch (SQLException exception) { +                throw new IOException("unable to add batch statement", exception); +            } + +            statementsAdded++; + +            if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } +        } +    } + +    /** +     * Constructs the query used as the prepared statement to insert data. +     * +     * @param table      the table to insert into +     * @param fieldNames the fields to insert into. If field names are unknown, supply an array of +     *                   nulls. +     */ +    protected String constructInsertQuery(String table, String[] fieldNames) { +        if (fieldNames == null) { +            throw new IllegalArgumentException("Field names may not be null"); +        } + +        StringBuilder query = new StringBuilder(); + +        query.append("INSERT INTO ").append(table); + +        if (fieldNames.length > 0 && fieldNames[0] != null) { +            query.append(" ("); + +            for (int i = 0; i < fieldNames.length; i++) { +                query.append(fieldNames[i]); + +                if (i != fieldNames.length - 1) { query.append(","); } +            } + +            query.append(")"); + +        } + +        query.append(" VALUES ("); + +        for (int i = 0; i < fieldNames.length; i++) { +            query.append("?"); + +            if (i != fieldNames.length - 1) { query.append(","); } +        } + +        query.append(")"); +         +        boolean test = true; +        if (test) { +        	query.append(" ON DUPLICATE KEY UPDATE "); +        	 +        	 +            for (int i = 1; i < fieldNames.length; i++) { + +                 +                if ( (i != 1) ) { query.append(","); } +                //if (i != fieldNames.length - 1) { query.append(","); } +                //&& (i != fieldNames.length - 1) +                query.append(fieldNames[i]); +                query.append(" = ?"); +                 +                 +            } +        } +         +        query.append(";"); +         +        LOG.info(" ===================== " + query.toString()); +        return query.toString(); +    } + +    protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { +        if (fieldNames == null) { +            throw new IllegalArgumentException("field names may not be null"); +        } + +        Set<String> updateNamesSet = new HashSet<String>(); +        Collections.addAll(updateNamesSet, updateNames); + +        StringBuilder query = new StringBuilder(); + +        query.append("UPDATE ").append(table); + +        query.append(" SET "); + +        if (fieldNames.length > 0 && fieldNames[0] != null) { +            int count = 0; + +            for (int i = 0; i < fieldNames.length; i++) { +                if (updateNamesSet.contains(fieldNames[i])) { continue; } + +                if (count != 0) { query.append(","); } + +                query.append(fieldNames[i]); +                query.append(" = ?"); + +                count++; +            } +        } + +        query.append(" WHERE "); + +        if (updateNames.length > 0 && updateNames[0] != null) { +            for (int i = 0; i < updateNames.length; i++) { +                query.append(updateNames[i]); +                query.append(" = ?"); + +                if (i != updateNames.length - 1) { query.append(" and "); } +            } +        } + +        query.append(";"); +        System.out.println("Update Query => " + query.toString()); +        return query.toString(); +    } + +    /** {@inheritDoc} */ +    public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { +    } + +    /** {@inheritDoc} */ +    public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, +        Progressable progress) throws IOException { +        DBConfiguration dbConf = new DBConfiguration(job); + +        String tableName = dbConf.getOutputTableName(); +        String[] fieldNames = dbConf.getOutputFieldNames(); +        String[] updateNames = dbConf.getOutputUpdateFieldNames(); +        int batchStatements = dbConf.getBatchStatementsNum(); + +        Connection connection = dbConf.getConnection(); + +        configureConnection(connection); + +        String sqlInsert = constructInsertQuery(tableName, fieldNames); +        PreparedStatement insertPreparedStatement; + +        try { +            insertPreparedStatement = connection.prepareStatement(sqlInsert); +            insertPreparedStatement.setEscapeProcessing(true); // should be on by default +        } catch (SQLException exception) { +            throw new IOException("unable to create statement for: " + sqlInsert, exception); +        } + +        String sqlUpdate = +            updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; +        PreparedStatement updatePreparedStatement = null; + +        try { +            updatePreparedStatement = +                sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; +        } catch (SQLException exception) { +            throw new IOException("unable to create statement for: " + sqlUpdate, exception); +        } + +        return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); +    } + +    protected void configureConnection(Connection connection) { +        setAutoCommit(connection); +    } + +    protected void setAutoCommit(Connection connection) { +        try { +            connection.setAutoCommit(false); +        } catch (Exception exception) { +            throw new RuntimeException("unable to set auto commit", exception); +        } +    } + +    /** +     * Initializes the reduce-part of the job with the appropriate output settings +     * +     * @param job                 The job +     * @param dbOutputFormatClass +     * @param tableName           The table to insert data into +     * @param fieldNames          The field names in the table. If unknown, supply the appropriate +     */ +    public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, +        String tableName, String[] fieldNames, String[] updateFields, int batchSize) { +        if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { +            job.setOutputFormat(dbOutputFormatClass); +        } + +        // writing doesn't always happen in reduce +        job.setReduceSpeculativeExecution(false); +        job.setMapSpeculativeExecution(false); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setOutputTableName(tableName); +        dbConf.setOutputFieldNames(fieldNames); + +        if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } + +        if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java new file mode 100644 index 0000000..1369c74 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.Writable; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Objects that are read from/written to a database should implement <code>DBWritable</code>. + * DBWritable, is similar to {@link Writable} except that the {@link #write(PreparedStatement)} + * method takes a {@link PreparedStatement}, and {@link #readFields(ResultSet)} takes a {@link + * ResultSet}. <p> Implementations are responsible for writing the fields of the object to + * PreparedStatement, and reading the fields of the object from the ResultSet. <p/> <p>Example:</p> + * If we have the following table in the database : + * <pre> + * CREATE TABLE MyTable ( + *   counter        INTEGER NOT NULL, + *   timestamp      BIGINT  NOT NULL, + * ); + * </pre> + * then we can read/write the tuples from/to the table with : + * <p><pre> + * public class MyWritable implements Writable, DBWritable { + *   // Some data + *   private int counter; + *   private long timestamp; + * <p/> + *   //Writable#write() implementation + *   public void write(DataOutput out) throws IOException { + *     out.writeInt(counter); + *     out.writeLong(timestamp); + *   } + * <p/> + *   //Writable#readFields() implementation + *   public void readFields(DataInput in) throws IOException { + *     counter = in.readInt(); + *     timestamp = in.readLong(); + *   } + * <p/> + *   public void write(PreparedStatement statement) throws SQLException { + *     statement.setInt(1, counter); + *     statement.setLong(2, timestamp); + *   } + * <p/> + *   public void readFields(ResultSet resultSet) throws SQLException { + *     counter = resultSet.getInt(1); + *     timestamp = resultSet.getLong(2); + *   } + * } + * </pre></p> + */ +public interface DBWritable { + +    /** +     * Sets the fields of the object in the {@link PreparedStatement}. +     * +     * @param statement the statement that the fields are put into. +     * @throws SQLException +     */ +    public void write(PreparedStatement statement) throws SQLException; + +    /** +     * Reads the fields of the object from the {@link ResultSet}. +     * +     * @param resultSet the {@link ResultSet} to get the fields from. +     * @throws SQLException +     */ +    public void readFields(ResultSet resultSet) throws SQLException; + +} | 
