diff options
16 files changed, 3471 insertions, 0 deletions
| diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java new file mode 100644 index 0000000..6788624 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java @@ -0,0 +1,25 @@ +package parallelai.spyglass.jdbc; + +import org.apache.hadoop.conf.Configuration; + +public class JDBCConstants { +   +  public enum JdbcSourceMode { +    SELECT, +    SELECT_WITH_PARTITIONS, +    SELECT_WITH_BUCKETS; +  } + +  public enum JdbcSinkMode { +	INSERT, +	UPDATE, +	UPSERT; +  } +   +  public static final String START_KEY = "jdbc.%s.startkey"; +  public static final String STOP_KEY = "jdbc.%s.stopkey";  +  public static final String SOURCE_MODE = "jdbc.%s.source.mode"; +  public static final String KEY_LIST = "jdbc.%s.key.list"; +  public static final String VERSIONS = "jdbc.%s.versions"; + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java new file mode 100644 index 0000000..5007895 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java @@ -0,0 +1,670 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import parallelai.spyglass.jdbc.db.DBInputFormat; +import parallelai.spyglass.jdbc.db.DBOutputFormat; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database. + * <p/> + * If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns + * for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated. + * <p/> + * Some constructors take columnFields and updateByFields. These values will be used during field name resolution + * to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names + * in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects, + * use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size. + * <p/> + * Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database. + */ +public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> +{ +    private Class<? extends DBInputFormat> inputFormatClass; +    private Class<? extends DBOutputFormat> outputFormatClass; +    private String[] columns; +    private String[] orderBy; +    private String conditions; +    private String[] updateBy; +    private Fields updateValueFields; +    private Fields updateByFields; +    private Fields columnFields; +    private Tuple updateIfTuple; +    private String selectQuery; +    private String countQuery; +    private long limit = -1; + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param limit             of type long +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columnFields      of type Fields +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param limit             of type long +     * @param updateByFields    of type Fields +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy ) +    { +        this.columnFields = columnFields; + +        verifyColumns( columnFields, columns ); + +        setSinkFields( columnFields ); +        setSourceFields( columnFields ); + +        if( updateBy != null && updateBy.length != 0 ) +        { +            this.updateBy = updateBy; +            this.updateByFields = updateByFields; + +            if( updateByFields.size() != updateBy.length ) +                throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" ); + +            if( !this.columnFields.contains( this.updateByFields ) ) +                throw new IllegalArgumentException( "columnFields must contain updateByFields column names" ); + +            this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields ); +            this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls +        } + +        this.columns = columns; +        this.orderBy = orderBy; +        this.conditions = conditions; +        this.limit = limit; + +        this.inputFormatClass = inputFormatClass; +        this.outputFormatClass = outputFormatClass; +    } + +    private void verifyColumns( Fields columnFields, String[] columns ) +    { +        if( columnFields.size() != columns.length ) +            throw new IllegalArgumentException( "columnFields and columns must be the same size" ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columnFields      of type Fields +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param conditions        of type String +     * @param updateByFields    of type Fields +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass  of type Class<? extends DBInputFormat> +     * @param outputFormatClass of type Class<? extends DBOutputFormat> +     * @param columnFields      of type Fields +     * @param columns           of type String[] +     * @param orderBy           of type String[] +     * @param updateByFields    of type Fields +     * @param updateBy          of type String[] +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) +    { +        this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns  of type String[] +     * @param orderBy  of type String[] +     * @param updateBy of type String[] +     */ +    public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy ) +    { +        this( null, null, columns, orderBy, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields   of type Fields +     * @param columns        of type String[] +     * @param orderBy        of type String[] +     * @param updateByFields of type Fields +     * @param updateBy       of type String[] +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy ) +    { +        this( null, null, columnFields, columns, orderBy, updateByFields, updateBy ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param orderBy    of type String[] +     * @param conditions of type String +     * @param limit      of type long +     */ +    public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit ) +    { +        this( null, null, columns, orderBy, conditions, limit, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param orderBy      of type String[] +     * @param conditions   of type String +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit ) +    { +        this( null, null, columnFields, columns, orderBy, conditions, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param orderBy    of type String[] +     * @param conditions of type String +     */ +    public JDBCScheme( String[] columns, String[] orderBy, String conditions ) +    { +        this( null, null, columns, orderBy, conditions, null ); +    } + +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions ) +    { +        this( null, null, columnFields, columns, orderBy, conditions, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     * @param orderBy of type String[] +     * @param limit   of type long +     */ +    public JDBCScheme( String[] columns, String[] orderBy, long limit ) +    { +        this( null, null, columns, orderBy, null, limit, null ); +    } + +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit ) +    { +        this( null, null, columnFields, columns, orderBy, null, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     * @param orderBy of type String[] +     */ +    public JDBCScheme( String[] columns, String[] orderBy ) +    { +        this( null, null, columns, orderBy, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param orderBy      of type String[] +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy ) +    { +        this( null, null, columnFields, columns, orderBy, null, -1, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param conditions of type String +     * @param limit      of type long +     */ +    public JDBCScheme( String[] columns, String conditions, long limit ) +    { +        this( null, null, columns, null, conditions, limit, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param conditions   of type String +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit ) +    { +        this( null, null, columnFields, columns, null, conditions, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns    of type String[] +     * @param conditions of type String +     */ +    public JDBCScheme( String[] columns, String conditions ) +    { +        this( null, null, columns, null, conditions, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param conditions   of type String +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String conditions ) +    { +        this( null, null, columnFields, columns, null, conditions, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     * @param limit   of type long +     */ +    public JDBCScheme( String[] columns, long limit ) +    { +        this( null, null, columns, null, null, limit, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, long limit ) +    { +        this( null, null, columnFields, columns, null, null, limit, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columns of type String[] +     */ +    public JDBCScheme( String[] columns ) +    { +        this( null, null, new Fields( columns ), columns, null, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     */ +    public JDBCScheme( Fields columnFields, String[] columns ) +    { +        this( null, null, columnFields, columns, null, null, null ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * <p/> +     * Use this constructor if the data source may only be used as a source. +     * +     * @param inputFormatClass of type Class<? extends DBInputFormat> +     * @param columns          of type String[] +     * @param selectQuery      of type String +     * @param countQuery       of type String +     * @param limit            of type long +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param inputFormatClass of type Class<? extends DBInputFormat> +     * @param columnFields     of type Fields +     * @param columns          of type String[] +     * @param selectQuery      of type String +     * @param countQuery       of type String +     * @param limit            of type long +     */ +    public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this.columnFields = columnFields; + +        verifyColumns( columnFields, columns ); + +        setSourceFields( columnFields ); + +        this.columns = columns; +        this.selectQuery = selectQuery.trim().replaceAll( ";$", "" ); +        this.countQuery = countQuery.trim().replaceAll( ";$", "" ); +        this.limit = limit; + +        this.inputFormatClass = inputFormatClass; +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * <p/> +     * Use this constructor if the data source may only be used as a source. +     * +     * @param columns     of type String[] +     * @param selectQuery of type String +     * @param countQuery  of type String +     * @param limit       of type long +     */ +    public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this( null, new Fields( columns ), columns, selectQuery, countQuery, limit ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param selectQuery  of type String +     * @param countQuery   of type String +     * @param limit        of type long +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit ) +    { +        this( null, columnFields, columns, selectQuery, countQuery, limit ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * <p/> +     * Use this constructor if the data source may only be used as a source. +     * +     * @param columns     of type String[] +     * @param selectQuery of type String +     * @param countQuery  of type String +     */ +    public JDBCScheme( String[] columns, String selectQuery, String countQuery ) +    { +        this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 ); +    } + +    /** +     * Constructor JDBCScheme creates a new JDBCScheme instance. +     * +     * @param columnFields of type Fields +     * @param columns      of type String[] +     * @param selectQuery  of type String +     * @param countQuery   of type String +     */ +    public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery ) +    { +        this( null, columnFields, columns, selectQuery, countQuery, -1 ); +    } + +    /** +     * Method getColumns returns the columns of this JDBCScheme object. +     * +     * @return the columns (type String[]) of this JDBCScheme object. +     */ +    public String[] getColumns() { +        return columns; +    } + +    /** +     * Method getOrderBy returns the orderBy of this JDBCScheme object. +     * +     * @return the orderBy (type String[]) of this JDBCScheme object. +     */ +    public String[] getOrderBy() { +        return orderBy; +    } + +    @Override +    public void sourceConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +        JobConf conf ) { +        int concurrentReads = ( (JDBCTap) tap ).concurrentReads; + +        if( selectQuery != null ) +            DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads ); +        else { +            String tableName = ( (JDBCTap) tap ).getTableName(); +            String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null; +            DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns ); +        } + +        if( inputFormatClass != null ) +            conf.setInputFormat( inputFormatClass ); +    } + +    @Override +    public void sinkConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, +        JobConf conf ) { +        if( selectQuery != null ) +            throw new TapException( "cannot sink to this Scheme" ); + +        String tableName = ( (JDBCTap) tap ).getTableName(); +        int batchSize = ( (JDBCTap) tap ).getBatchSize(); +        DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize ); + +        if( outputFormatClass != null ) +            conf.setOutputFormat( outputFormatClass ); +    } + +    @Override +    public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) +    { +        Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + +        sourceCall.setContext( pair ); +    } + +    @Override +    public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException +    { +        Object key = sourceCall.getContext()[ 0 ]; +        Object value = sourceCall.getContext()[ 1 ]; +        boolean result = sourceCall.getInput().next( key, value ); + +        if( !result ) +            return false; + +        Tuple newTuple = ( (TupleRecord) value ).getTuple(); +        sourceCall.getIncomingEntry().setTuple( newTuple ); + +        return true; +    } + +    @Override +    public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) { +        sourceCall.setContext( null ); +    } + +    @Override +    public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException { +        // it's ok to use NULL here so the collector does not write anything +        TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +        OutputCollector outputCollector = sinkCall.getOutput(); +        if( updateBy != null ) +        { +            Tuple allValues = tupleEntry.selectTuple( updateValueFields ); +            Tuple updateValues = tupleEntry.selectTuple( updateByFields ); + +            allValues = cleanTuple( allValues ); + +            TupleRecord key = new TupleRecord( allValues ); + +            if( updateValues.equals( updateIfTuple ) ) +                outputCollector.collect( key, null ); +            else +                outputCollector.collect( key, key ); + +            return; +        } + +        Tuple result = tupleEntry.selectTuple( getSinkFields() ); + +        result = cleanTuple( result ); + +        outputCollector.collect( new TupleRecord( result ), null ); +    } + +    /** +     * Provides a hook for subclasses to escape or modify any values before creating the final SQL statement. +     * +     * @param result +     * @return +     */ +    protected Tuple cleanTuple( Tuple result ) { +        return result; +    } + +    @Override +    public boolean equals( Object object ) { +        if( this == object ) +            return true; +        if( !( object instanceof JDBCScheme ) ) +            return false; +        if( !super.equals( object ) ) +            return false; + +        JDBCScheme that = (JDBCScheme) object; + +        if( limit != that.limit ) +            return false; +        if( columnFields != null ? !columnFields.equals( that.columnFields ) : that.columnFields != null ) +            return false; +        if( !Arrays.equals( columns, that.columns ) ) +            return false; +        if( conditions != null ? !conditions.equals( that.conditions ) : that.conditions != null ) +            return false; +        if( countQuery != null ? !countQuery.equals( that.countQuery ) : that.countQuery != null ) +            return false; +        if( inputFormatClass != null ? !inputFormatClass.equals( that.inputFormatClass ) : that.inputFormatClass != null ) +            return false; +        if( !Arrays.equals( orderBy, that.orderBy ) ) +            return false; +        if( outputFormatClass != null ? !outputFormatClass.equals( that.outputFormatClass ) : that.outputFormatClass != null ) +            return false; +        if( selectQuery != null ? !selectQuery.equals( that.selectQuery ) : that.selectQuery != null ) +            return false; +        if( !Arrays.equals( updateBy, that.updateBy ) ) +            return false; +        if( updateByFields != null ? !updateByFields.equals( that.updateByFields ) : that.updateByFields != null ) +            return false; +        if( updateIfTuple != null ? !updateIfTuple.equals( that.updateIfTuple ) : that.updateIfTuple != null ) +            return false; +        if( updateValueFields != null ? !updateValueFields.equals( that.updateValueFields ) : that.updateValueFields != null ) +            return false; + +        return true; +    } + +    @Override +    public int hashCode() { +        int result = super.hashCode(); +        result = 31 * result + ( inputFormatClass != null ? inputFormatClass.hashCode() : 0 ); +        result = 31 * result + ( outputFormatClass != null ? outputFormatClass.hashCode() : 0 ); +        result = 31 * result + ( columns != null ? Arrays.hashCode( columns ) : 0 ); +        result = 31 * result + ( orderBy != null ? Arrays.hashCode( orderBy ) : 0 ); +        result = 31 * result + ( conditions != null ? conditions.hashCode() : 0 ); +        result = 31 * result + ( updateBy != null ? Arrays.hashCode( updateBy ) : 0 ); +        result = 31 * result + ( updateValueFields != null ? updateValueFields.hashCode() : 0 ); +        result = 31 * result + ( updateByFields != null ? updateByFields.hashCode() : 0 ); +        result = 31 * result + ( columnFields != null ? columnFields.hashCode() : 0 ); +        result = 31 * result + ( updateIfTuple != null ? updateIfTuple.hashCode() : 0 ); +        result = 31 * result + ( selectQuery != null ? selectQuery.hashCode() : 0 ); +        result = 31 * result + ( countQuery != null ? countQuery.hashCode() : 0 ); +        result = 31 * result + (int) ( limit ^ ( limit >>> 32 ) ); +        return result; +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java new file mode 100644 index 0000000..1c10a05 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java @@ -0,0 +1,621 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.util.HadoopUtil; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parallelai.spyglass.jdbc.db.DBConfiguration; + +import java.io.IOException; +import java.sql.*; +import java.util.*; + +/** + * Class JDBCTap is a {@link Tap} sub-class that provides read and write access to a RDBMS via JDBC drivers. + * <p/> + * This Tap fully supports TABLE DROP and CREATE when given a {@link TableDesc} instance. + * <p/> + * When using {@link SinkMode#UPDATE}, Cascading is instructed to not delete the resource (drop the Table) + * and assumes its safe to begin sinking data into it. The {@link JDBCScheme} is responsible for + * deciding if/when to perform an UPDATE instead of an INSERT. + * <p/> + * Both INSERT and UPDATE are supported through the JDBCScheme. + * <p/> + * By sub-classing JDBCScheme, {@link com.twitter.maple.jdbc.db.DBInputFormat}, and {@link com.twitter.maple.jdbc.db.DBOutputFormat}, + * specific vendor features can be supported. + * <p/> + * Use {@link #setBatchSize(int)} to set the number of INSERT/UPDATES should be grouped together before being + * executed. The default vaue is 1,000. + * <p/> + * Use {@link #executeQuery(String, int)} or {@link #executeUpdate(String)} to invoke SQL statements against + * the underlying Table. + * <p/> + * Note that all classes under the {@link com.twitter.maple.jdbc.db} package originated from the Hadoop project and + * retain their Apache 2.0 license though they have been heavily modified to support INSERT/UPDATE and + * vendor specialization, and a number of other features like 'limit'. + * + * @see JDBCScheme + * @see com.twitter.maple.jdbc.db.DBInputFormat + * @see com.twitter.maple.jdbc.db.DBOutputFormat + */ +public class JDBCTap extends Tap<JobConf, RecordReader, OutputCollector> { +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger(JDBCTap.class); + +    private final String id = UUID.randomUUID().toString(); + +    /** Field connectionUrl */ +    String connectionUrl; +    /** Field username */ +    String username; +    /** Field password */ +    String password; +    /** Field driverClassName */ +    String driverClassName; +    /** Field tableDesc */ +    TableDesc tableDesc; +    /** Field batchSize */ +    int batchSize = 1000; +    /** Field concurrentReads */ +    int concurrentReads = 0; + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * <p/> +     * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated +     * into. By default it uses {@link SinkMode#UPDATE}. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param tableName       of type String +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, String tableName, JDBCScheme scheme ) { +        this( connectionUrl, username, password, driverClassName, new TableDesc( tableName ), scheme, SinkMode.UPDATE ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * +     * @param connectionUrl   of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     * @param sinkMode        of type SinkMode +     */ +    public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) { +        this( connectionUrl, null, null, driverClassName, tableDesc, scheme, sinkMode ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * <p/> +     * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated +     * into. By default it uses {@link SinkMode#UPDATE}. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) { +        this( connectionUrl, username, password, driverClassName, tableDesc, scheme, SinkMode.UPDATE ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     * @param sinkMode        of type SinkMode +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) { +        super( scheme, sinkMode ); +        this.connectionUrl = connectionUrl; +        this.username = username; +        this.password = password; +        this.driverClassName = driverClassName; +        this.tableDesc = tableDesc; + +        if( tableDesc.getColumnDefs() == null && sinkMode != SinkMode.UPDATE ) +            throw new IllegalArgumentException( "cannot have sink mode REPLACE or KEEP without TableDesc column defs, use UPDATE mode" ); + +        if( sinkMode != SinkMode.UPDATE ) +            LOG.warn( "using sink mode: {}, consider UPDATE to prevent DROP TABLE from being called during Flow or Cascade setup", sinkMode ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * <p/> +     * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated +     * into. By default it uses {@link SinkMode#UPDATE}. +     * +     * @param connectionUrl   of type String +     * @param driverClassName of type String +     * @param tableDesc       of type TableDesc +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) { +        this( connectionUrl, driverClassName, tableDesc, scheme, SinkMode.UPDATE ); +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance that may only used as a data source. +     * +     * @param connectionUrl   of type String +     * @param username        of type String +     * @param password        of type String +     * @param driverClassName of type String +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String username, String password, String driverClassName, JDBCScheme scheme ) { +        super( scheme ); +        this.connectionUrl = connectionUrl; +        this.username = username; +        this.password = password; +        this.driverClassName = driverClassName; +    } + +    /** +     * Constructor JDBCTap creates a new JDBCTap instance. +     * +     * @param connectionUrl   of type String +     * @param driverClassName of type String +     * @param scheme          of type JDBCScheme +     */ +    public JDBCTap( String connectionUrl, String driverClassName, JDBCScheme scheme ) { +        this( connectionUrl, null, null, driverClassName, scheme ); +    } + +    /** +     * Method getTableName returns the tableName of this JDBCTap object. +     * +     * @return the tableName (type String) of this JDBCTap object. +     */ +    public String getTableName() { +        return tableDesc.tableName; +    } + +    /** +     * Method setBatchSize sets the batchSize of this JDBCTap object. +     * +     * @param batchSize the batchSize of this JDBCTap object. +     */ +    public void setBatchSize( int batchSize ) { +        this.batchSize = batchSize; +    } + +    /** +     * Method getBatchSize returns the batchSize of this JDBCTap object. +     * +     * @return the batchSize (type int) of this JDBCTap object. +     */ +    public int getBatchSize() { +        return batchSize; +    } + +    /** +     * Method getConcurrentReads returns the concurrentReads of this JDBCTap object. +     * <p/> +     * This value specifies the number of concurrent selects and thus the number of mappers +     * that may be used. A value of -1 uses the job default. +     * +     * @return the concurrentReads (type int) of this JDBCTap object. +     */ +    public int getConcurrentReads() { +        return concurrentReads; +    } + +    /** +     * Method setConcurrentReads sets the concurrentReads of this JDBCTap object. +     * <p/> +     * This value specifies the number of concurrent selects and thus the number of mappers +     * that may be used. A value of -1 uses the job default. +     * +     * @param concurrentReads the concurrentReads of this JDBCTap object. +     */ +    public void setConcurrentReads( int concurrentReads ) { +        this.concurrentReads = concurrentReads; +    } + +    /** +     * Method getPath returns the path of this JDBCTap object. +     * +     * @return the path (type Path) of this JDBCTap object. +     */ +    public Path getPath() { +        return new Path( getJDBCPath() ); +    } + +    @Override +    public String getIdentifier() { +        return getJDBCPath() + this.id; +    } + + +    public String getJDBCPath() { +        return "jdbc:/" + connectionUrl.replaceAll( ":", "_" ); +    } + +    public boolean isWriteDirect() { +        return true; +    } + +    private JobConf getSourceConf( FlowProcess<JobConf> flowProcess, JobConf conf, String property ) +        throws IOException { +      //  Map<String, String> priorConf = HadoopUtil.deserializeBase64( property, conf, true ); +      //  return flowProcess.mergeMapIntoConfig( conf, priorConf ); +    	 +    	return null; +    } + +    @Override +    public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException { +        // input may be null when this method is called on the client side or cluster side when accumulating +        // for a HashJoin +        return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); +    } + +    @Override +    public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException { +        if( !isSink() ) +            throw new TapException( "this tap may not be used as a sink, no TableDesc defined" ); + +        LOG.info("Creating JDBCTapCollector output instance"); +        JDBCTapCollector jdbcCollector = new JDBCTapCollector( flowProcess, this ); + +        jdbcCollector.prepare(); + +        return jdbcCollector; +    } + +    @Override +    public boolean isSink() +    { +        return tableDesc != null; +    } + +    @Override +    public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf ) +    { +        // a hack for MultiInputFormat to see that there is a child format +        FileInputFormat.setInputPaths( conf, getPath() ); + +        if( username == null ) +            DBConfiguration.configureDB(conf, driverClassName, connectionUrl); +        else +            DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password ); + +        super.sourceConfInit( process, conf ); +    } + +    @Override +    public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf ) +    { +        if( !isSink() ) +            return; + +        // do not delete if initialized from within a task +        try { +            if( isReplace() && conf.get( "mapred.task.partition" ) == null && !deleteResource( conf ) ) +                throw new TapException( "unable to drop table: " + tableDesc.getTableName() ); + +            if( !createResource( conf ) ) +                throw new TapException( "unable to create table: " + tableDesc.getTableName() ); +        } catch(IOException e) { +            throw new TapException( "error while trying to modify table: " + tableDesc.getTableName() ); +        } + +        if( username == null ) +            DBConfiguration.configureDB( conf, driverClassName, connectionUrl ); +        else +            DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password ); + +        super.sinkConfInit( process, conf ); +    } + +    private Connection createConnection() +    { +        try +        { +            LOG.info( "creating connection: {}", connectionUrl ); + +            Class.forName( driverClassName ); + +            Connection connection = null; + +            if( username == null ) +                connection = DriverManager.getConnection( connectionUrl ); +            else +                connection = DriverManager.getConnection( connectionUrl, username, password ); + +            connection.setAutoCommit( false ); + +            return connection; +        } +        catch( ClassNotFoundException exception ) +        { +            throw new TapException( "unable to load driver class: " + driverClassName, exception ); +        } +        catch( SQLException exception ) +        { +            throw new TapException( "unable to open connection: " + connectionUrl, exception ); +        } +    } + +    /** +     * Method executeUpdate allows for ad-hoc update statements to be sent to the remote RDBMS. The number of +     * rows updated will be returned, if applicable. +     * +     * @param updateString of type String +     * @return int +     */ +    public int executeUpdate( String updateString ) +    { +        Connection connection = null; +        int result; + +        try +        { +            connection = createConnection(); + +            try +            { +                LOG.info( "executing update: {}", updateString ); + +                Statement statement = connection.createStatement(); + +                result = statement.executeUpdate( updateString ); + +                connection.commit(); +                statement.close(); +            } +            catch( SQLException exception ) +            { +                throw new TapException( "unable to execute update statement: " + updateString, exception ); +            } +        } +        finally +        { +            try +            { +                if( connection != null ) +                    connection.close(); +            } +            catch( SQLException exception ) +            { +                // ignore +                LOG.warn( "ignoring connection close exception", exception ); +            } +        } + +        return result; +    } + +    /** +     * Method executeQuery allows for ad-hoc queries to be sent to the remove RDBMS. A value +     * of -1 for returnResults will return a List of all results from the query, a value of 0 will return an empty List. +     * +     * @param queryString   of type String +     * @param returnResults of type int +     * @return List +     */ +    public List<Object[]> executeQuery( String queryString, int returnResults ) +    { +        Connection connection = null; +        List<Object[]> result = Collections.emptyList(); + +        try +        { +            connection = createConnection(); + +            try +            { +                LOG.info( "executing query: {}", queryString ); + +                Statement statement = connection.createStatement(); + +                ResultSet resultSet = statement.executeQuery( queryString ); // we don't care about results + +                if( returnResults != 0 ) +                    result = copyResultSet( resultSet, returnResults == -1 ? Integer.MAX_VALUE : returnResults ); + +                connection.commit(); +                statement.close(); +            } +            catch( SQLException exception ) +            { +                throw new TapException( "unable to execute query statement: " + queryString, exception ); +            } +        } +        finally +        { +            try +            { +                if( connection != null ) +                    connection.close(); +            } +            catch( SQLException exception ) +            { +                // ignore +                LOG.warn( "ignoring connection close exception", exception ); +            } +        } + +        return result; +    } + +    private List<Object[]> copyResultSet( ResultSet resultSet, int length ) throws SQLException +    { +        List<Object[]> results = new ArrayList<Object[]>( length ); +        int size = resultSet.getMetaData().getColumnCount(); + +        int count = 0; + +        while( resultSet.next() && count < length ) +        { +            count++; + +            Object[] row = new Object[size]; + +            for( int i = 0; i < row.length; i++ ) +                row[ i ] = resultSet.getObject( i + 1 ); + +            results.add( row ); +        } + +        return results; +    } + +    @Override +    public boolean createResource( JobConf conf ) throws IOException +    { +        if( resourceExists( conf ) ) +            return true; + +        try +        { +            LOG.info( "creating table: {}", tableDesc.tableName ); + +            executeUpdate( tableDesc.getCreateTableStatement() ); +        } +        catch( TapException exception ) +        { +            LOG.warn( "unable to create table: {}", tableDesc.tableName ); +            LOG.warn( "sql failure", exception.getCause() ); + +            return false; +        } + +        return resourceExists( conf ); +    } + +    @Override +    public boolean deleteResource( JobConf conf ) throws IOException +    { +        if( !isSink() ) +            return false; + +        if( !resourceExists( conf ) ) +            return true; + +        try +        { +            LOG.info( "deleting table: {}", tableDesc.tableName ); + +            executeUpdate( tableDesc.getTableDropStatement() ); +        } +        catch( TapException exception ) +        { +            LOG.warn( "unable to drop table: {}", tableDesc.tableName ); +            LOG.warn( "sql failure", exception.getCause() ); + +            return false; +        } + +        return !resourceExists( conf ); +    } + +    @Override +    public boolean resourceExists( JobConf conf ) throws IOException +    { +        if( !isSink() ) +            return true; + +        try +        { +            LOG.info( "test table exists: {}", tableDesc.tableName ); + +            executeQuery( tableDesc.getTableExistsQuery(), 0 ); +        } +        catch( TapException exception ) +        { +            return false; +        } + +        return true; +    } + +    @Override +    public long getModifiedTime( JobConf conf ) throws IOException +    { +        return System.currentTimeMillis(); +    } + +    @Override +    public String toString() +    { +        return "JDBCTap{" + "connectionUrl='" + connectionUrl + '\'' + ", driverClassName='" + driverClassName + '\'' + ", tableDesc=" + tableDesc + '}'; +    } + +    @Override +    public boolean equals( Object object ) +    { +        if( this == object ) +            return true; +        if( !( object instanceof JDBCTap ) ) +            return false; +        if( !super.equals( object ) ) +            return false; + +        JDBCTap jdbcTap = (JDBCTap) object; + +        if( connectionUrl != null ? !connectionUrl.equals( jdbcTap.connectionUrl ) : jdbcTap.connectionUrl != null ) +            return false; +        if( driverClassName != null ? !driverClassName.equals( jdbcTap.driverClassName ) : jdbcTap.driverClassName != null ) +            return false; +        if( password != null ? !password.equals( jdbcTap.password ) : jdbcTap.password != null ) +            return false; +        if( tableDesc != null ? !tableDesc.equals( jdbcTap.tableDesc ) : jdbcTap.tableDesc != null ) +            return false; +        if( username != null ? !username.equals( jdbcTap.username ) : jdbcTap.username != null ) +            return false; + +        return true; +    } + +    @Override +    public int hashCode() +    { +        int result = super.hashCode(); +        result = 31 * result + ( connectionUrl != null ? connectionUrl.hashCode() : 0 ); +        result = 31 * result + ( username != null ? username.hashCode() : 0 ); +        result = 31 * result + ( password != null ? password.hashCode() : 0 ); +        result = 31 * result + ( driverClassName != null ? driverClassName.hashCode() : 0 ); +        result = 31 * result + ( tableDesc != null ? tableDesc.hashCode() : 0 ); +        result = 31 * result + batchSize; +        return result; +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java new file mode 100644 index 0000000..e8b7deb --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class JDBCTapCollector is a kind of {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the resource managed by + * a particular {@link JDBCTap} instance. + */ +public class JDBCTapCollector extends TupleEntrySchemeCollector implements OutputCollector +{ +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger( JDBCTapCollector.class ); + +    /** Field conf */ +    private final JobConf conf; +    /** Field writer */ +    private RecordWriter writer; +    /** Field flowProcess */ +    private final FlowProcess<JobConf> hadoopFlowProcess; +    /** Field tap */ +    private final Tap<JobConf, RecordReader, OutputCollector> tap; +    /** Field reporter */ +    private final Reporter reporter = Reporter.NULL; + +    /** +     * Constructor TapCollector creates a new TapCollector instance. +     * +     * @param flowProcess +     * @param tap               of type Tap +     * @throws IOException when fails to initialize +     */ +    public JDBCTapCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException { +        super( flowProcess, tap.getScheme() ); +        this.hadoopFlowProcess = flowProcess; + +        this.tap = tap; +        this.conf = new JobConf( flowProcess.getConfigCopy() ); + +        this.setOutput( this ); +    } + +    @Override +    public void prepare() { +        try { +            initialize(); +        } catch (IOException e) { +            throw new RuntimeException(e); +        } + +        super.prepare(); +    } + +    private void initialize() throws IOException { +        tap.sinkConfInit( hadoopFlowProcess, conf ); + +        OutputFormat outputFormat = conf.getOutputFormat(); + +        LOG.info("Output format class is: " + outputFormat.getClass().toString()); + +        writer = outputFormat.getRecordWriter( null, conf, tap.getIdentifier(), Reporter.NULL ); + +        sinkCall.setOutput( this ); +    } + +    @Override +    public void close() { +        try { +            LOG.info( "closing tap collector for: {}", tap ); +            writer.close( reporter ); +        } catch( IOException exception ) { +            LOG.warn( "exception closing: {}", exception ); +            throw new TapException( "exception closing JDBCTapCollector", exception ); +        } finally { +            super.close(); +        } +    } + +    /** +     * Method collect writes the given values to the {@link Tap} this instance encapsulates. +     * +     * @param writableComparable of type WritableComparable +     * @param writable           of type Writable +     * @throws IOException when +     */ +    public void collect( Object writableComparable, Object writable ) throws IOException { +        if (hadoopFlowProcess instanceof HadoopFlowProcess) +            ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + +        writer.write( writableComparable, writable ); +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/TableDesc.java b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java new file mode 100644 index 0000000..b0aaea7 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.util.Util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Class TableDesc describes a SQL based table, this description is used by the {@link JDBCTap} when + * creating a missing table. + * + * @see JDBCTap + * @see JDBCScheme + */ +public class TableDesc implements Serializable { +    /** Field tableName */ +    String tableName; +    /** Field columnNames */ +    String[] columnNames; +    /** Field columnDefs */ +    String[] columnDefs; +    /** Field primaryKeys */ +    String[] primaryKeys; + +    /** +     * Constructor TableDesc creates a new TableDesc instance. +     * +     * @param tableName of type String +     */ +    public TableDesc( String tableName ) { +        this.tableName = tableName; +    } + +    /** +     * Constructor TableDesc creates a new TableDesc instance. +     * +     * @param tableName   of type String +     * @param columnNames of type String[] +     * @param columnDefs  of type String[] +     * @param primaryKeys of type String +     */ +    public TableDesc( String tableName, String[] columnNames, String[] columnDefs, String[] primaryKeys ) { +        this.tableName = tableName; +        this.columnNames = columnNames; +        this.columnDefs = columnDefs; +        this.primaryKeys = primaryKeys; +    } + +    public String getTableName() { +        return tableName; +    } + +    public String[] getColumnNames() { +        return columnNames; +    } + +    public String[] getColumnDefs() { +        return columnDefs; +    } + +    public String[] getPrimaryKeys() { +        return primaryKeys; +    } + +    /** +     * Method getTableCreateStatement returns the tableCreateStatement of this TableDesc object. +     * +     * @return the tableCreateStatement (type String) of this TableDesc object. +     */ +    public String getCreateTableStatement() { +        List<String> createTableStatement = new ArrayList<String>(); + +        createTableStatement = addCreateTableBodyTo( createTableStatement ); + +        return String.format( getCreateTableFormat(), tableName, Util.join( createTableStatement, ", " ) ); +    } + +    protected List<String> addCreateTableBodyTo( List<String> createTableStatement ) { +        createTableStatement = addDefinitionsTo( createTableStatement ); +        createTableStatement = addPrimaryKeyTo( createTableStatement ); + +        return createTableStatement; +    } + +    protected String getCreateTableFormat() { +        return "CREATE TABLE %s ( %s )"; +    } + +    protected List<String> addDefinitionsTo( List<String> createTableStatement ) { +        for( int i = 0; i < columnNames.length; i++ ) { +            String columnName = columnNames[ i ]; +            String columnDef = columnDefs[ i ]; + +            createTableStatement.add( columnName + " " + columnDef ); +        } + +        return createTableStatement; +    } + +    protected List<String> addPrimaryKeyTo( List<String> createTableStatement ) { +        if( hasPrimaryKey() ) +            createTableStatement.add( String.format( "PRIMARY KEY( %s )", Util.join( primaryKeys, ", " ) ) ); + +        return createTableStatement; +    } + +    /** +     * Method getTableDropStatement returns the tableDropStatement of this TableDesc object. +     * +     * @return the tableDropStatement (type String) of this TableDesc object. +     */ +    public String getTableDropStatement() { +        return String.format( getDropTableFormat(), tableName ); +    } + +    protected String getDropTableFormat() { +        return "DROP TABLE %s"; +    } + +    /** +     * Method getTableExistsQuery returns the tableExistsQuery of this TableDesc object. +     * +     * @return the tableExistsQuery (type String) of this TableDesc object. +     */ +    public String getTableExistsQuery() { +        return String.format( "select 1 from %s where 1 = 0", tableName ); +    } + +    private boolean hasPrimaryKey() { +        return primaryKeys != null && primaryKeys.length != 0; +    } + +    @Override +    public String toString() { +        return "TableDesc{" + "tableName='" + tableName + '\'' + ", columnNames=" + ( columnNames == null ? null : Arrays.asList( columnNames ) ) + ", columnDefs=" + ( columnDefs == null ? null : Arrays.asList( columnDefs ) ) + ", primaryKeys=" + ( primaryKeys == null ? null : Arrays.asList( primaryKeys ) ) + '}'; +    } + +    @Override +    public boolean equals( Object object ) { +        if( this == object ) +            return true; +        if( !( object instanceof TableDesc ) ) +            return false; + +        TableDesc tableDesc = (TableDesc) object; + +        if( !Arrays.equals( columnDefs, tableDesc.columnDefs ) ) +            return false; +        if( !Arrays.equals( columnNames, tableDesc.columnNames ) ) +            return false; +        if( !Arrays.equals( primaryKeys, tableDesc.primaryKeys ) ) +            return false; +        if( tableName != null ? !tableName.equals( tableDesc.tableName ) : tableDesc.tableName != null ) +            return false; + +        return true; +    } + +    @Override +    public int hashCode() { +        int result = tableName != null ? tableName.hashCode() : 0; +        result = 31 * result + ( columnNames != null ? Arrays.hashCode( columnNames ) : 0 ); +        result = 31 * result + ( columnDefs != null ? Arrays.hashCode( columnDefs ) : 0 ); +        result = 31 * result + ( primaryKeys != null ? Arrays.hashCode( primaryKeys ) : 0 ); +        return result; +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java new file mode 100644 index 0000000..4191e79 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc; + +import cascading.tuple.Tuple; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import parallelai.spyglass.jdbc.db.DBWritable; + +public class TupleRecord implements DBWritable { +    private Tuple tuple; + +    public TupleRecord() { +    } + +    public TupleRecord( Tuple tuple ) { +        this.tuple = tuple; +    } + +    public void setTuple( Tuple tuple ) { +        this.tuple = tuple; +    } + +    public Tuple getTuple() { +        return tuple; +    } + +    public void write( PreparedStatement statement ) throws SQLException { +        for( int i = 0; i < tuple.size(); i++ ) { +        	//System.out.println("Insert Tuple => " + " statement.setObject( " + (i + 1) + "," + tuple.get( i )); +            statement.setObject( i + 1, tuple.get( i ) ); +        } +        boolean test = true; +        if (test) { +	        for( int i = 1; i < tuple.size(); i++ ) { +	        	//System.out.println("Update Tuple => " + " statement.setObject( " + (i + tuple.size()) + "," + tuple.get( i )); +	            statement.setObject( i + tuple.size(), tuple.get( i ) ); +	        } +        } +         +    } + +    public void readFields( ResultSet resultSet ) throws SQLException { +        tuple = new Tuple(); + +        for( int i = 0; i < resultSet.getMetaData().getColumnCount(); i++ ) +            tuple.add( (Comparable) resultSet.getObject( i + 1 ) ); +    } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java new file mode 100644 index 0000000..1bb0786 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * A container for configuration property names for jobs with DB input/output. <br> The job can be + * configured using the static methods in this class, {@link DBInputFormat}, and {@link + * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper + * values. + */ +public class DBConfiguration { + +    /** The JDBC Driver class name */ +    public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class"; + +    /** JDBC Database access URL */ +    public static final String URL_PROPERTY = "mapred.jdbc.url"; + +    /** User name to access the database */ +    public static final String USERNAME_PROPERTY = "mapred.jdbc.username"; + +    /** Password to access the database */ +    public static final String PASSWORD_PROPERTY = "mapred.jdbc.password"; + +    /** Input table name */ +    public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name"; + +    /** Field names in the Input table */ +    public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names"; + +    /** WHERE clause in the input SELECT statement */ +    public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions"; + +    /** ORDER BY clause in the input SELECT statement */ +    public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby"; + +    /** Whole input query, exluding LIMIT...OFFSET */ +    public static final String INPUT_QUERY = "mapred.jdbc.input.query"; + +    /** The number of records to LIMIT, useful for testing */ +    public static final String INPUT_LIMIT = "mapred.jdbc.input.limit"; + +    /** Input query to get the count of records */ +    public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query"; + +    /** Class name implementing DBWritable which will hold input tuples */ +    public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class"; + +    /** Output table name */ +    public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name"; + +    /** Field names in the Output table */ +    public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names"; + +    /** Field names in the Output table */ +    public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY = +        "mapred.jdbc.output.update.field.names"; + +    /** The number of statements to batch before executing */ +    public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num"; + +    /** The number of splits allowed, becomes max concurrent reads. */ +    public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num"; + +    /** +     * Sets the DB access related fields in the Configuration. +     * +     * @param job         the job +     * @param driverClass JDBC Driver class name +     * @param dbUrl       JDBC DB access URL. +     * @param userName    DB access username +     * @param passwd      DB access passwd +     */ +    public static void configureDB(Configuration job, String driverClass, String dbUrl, +        String userName, String passwd) { +        job.set(DRIVER_CLASS_PROPERTY, driverClass); +        job.set(URL_PROPERTY, dbUrl); + +        if (userName != null) { job.set(USERNAME_PROPERTY, userName); } + +        if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); } +    } + +    /** +     * Sets the DB access related fields in the Configuration. +     * +     * @param job         the job +     * @param driverClass JDBC Driver class name +     * @param dbUrl       JDBC DB access URL. +     */ +    public static void configureDB(Configuration job, String driverClass, String dbUrl) { +        configureDB(job, driverClass, dbUrl, null, null); +    } + +    private Configuration job; + +    DBConfiguration(Configuration job) { +        this.job = job; +    } + +    /** +     * Returns a connection object to the DB +     * +     * @throws ClassNotFoundException +     * @throws SQLException +     */ +    Connection getConnection() throws IOException { +        try { +            Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); +        } catch (ClassNotFoundException exception) { +            throw new IOException("unable to load conection driver", exception); +        } + +        try { +            if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) { +                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); +            } else { +                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job +                    .get(DBConfiguration.USERNAME_PROPERTY), job +                    .get(DBConfiguration.PASSWORD_PROPERTY)); +            } +        } catch (SQLException exception) { +            throw new IOException("unable to create connection", exception); +        } +    } + +    String getInputTableName() { +        return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); +    } + +    void setInputTableName(String tableName) { +        job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); +    } + +    String[] getInputFieldNames() { +        return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY); +    } + +    void setInputFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    String getInputConditions() { +        return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY); +    } + +    void setInputConditions(String conditions) { +        if (conditions != null && conditions.length() > 0) { +            job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions); +        } +    } + +    String getInputOrderBy() { +        return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY); +    } + +    void setInputOrderBy(String orderby) { +        if (orderby != null && orderby.length() > 0) { +            job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby); +        } +    } + +    String getInputQuery() { +        return job.get(DBConfiguration.INPUT_QUERY); +    } + +    void setInputQuery(String query) { +        if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); } +    } + +    long getInputLimit() { +        return job.getLong(DBConfiguration.INPUT_LIMIT, -1); +    } + +    void setInputLimit(long limit) { +        job.setLong(DBConfiguration.INPUT_LIMIT, limit); +    } + +    String getInputCountQuery() { +        return job.get(DBConfiguration.INPUT_COUNT_QUERY); +    } + +    void setInputCountQuery(String query) { +        if (query != null && query.length() > 0) { +            job.set(DBConfiguration.INPUT_COUNT_QUERY, query); +        } +    } + +    Class<?> getInputClass() { +        return job +            .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class); +    } + +    void setInputClass(Class<? extends DBWritable> inputClass) { +        job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class); +    } + +    String getOutputTableName() { +        return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY); +    } + +    void setOutputTableName(String tableName) { +        job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); +    } + +    String[] getOutputFieldNames() { +        return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY); +    } + +    void setOutputFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    String[] getOutputUpdateFieldNames() { +        return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY); +    } + +    void setOutputUpdateFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    int getBatchStatementsNum() { +        return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000); +    } + +    void setBatchStatementsNum(int batchStatementsNum) { +        job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum); +    } + +    int getMaxConcurrentReadsNum() { +        return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0); +    } + +    void setMaxConcurrentReadsNum(int maxConcurrentReads) { +        if (maxConcurrentReads < 0) { +            throw new IllegalArgumentException("maxConcurrentReads must be a positive value"); +        } + +        job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads); +    } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java new file mode 100644 index 0000000..115d9bd --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java @@ -0,0 +1,452 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.*; + +/** + * A InputFormat that reads input data from an SQL table. <p/> DBInputFormat emits LongWritables + * containing the record number as key and DBWritables as value. <p/> The SQL query, and input class + * can be using one of the two setInput methods. + */ +public class DBInputFormat<T extends DBWritable> +    implements InputFormat<LongWritable, T>, JobConfigurable { +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger(DBInputFormat.class); + +    /** +     * A RecordReader that reads records from a SQL table. Emits LongWritables containing the record +     * number as key and DBWritables as value. +     */ +    protected class DBRecordReader implements RecordReader<LongWritable, T> { +        private ResultSet results; +        private Statement statement; +        private Class<T> inputClass; +        private JobConf job; +        private DBInputSplit split; +        private long pos = 0; + +        /** +         * @param split The InputSplit to read data for +         * @throws SQLException +         */ +        protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) +            throws SQLException, IOException { +            this.inputClass = inputClass; +            this.split = split; +            this.job = job; + +            statement = +                connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + +            //statement.setFetchSize(Integer.MIN_VALUE); +            String query = getSelectQuery(); +            try { +                LOG.info(query); +                results = statement.executeQuery(query); +                LOG.info("done executing select query"); +            } catch (SQLException exception) { +                LOG.error("unable to execute select query: " + query, exception); +                throw new IOException("unable to execute select query: " + query, exception); +            } +        } + +        /** +         * Returns the query for selecting the records, subclasses can override this for custom +         * behaviour. +         */ +        protected String getSelectQuery() { +            LOG.info("Executing select query"); +            StringBuilder query = new StringBuilder(); + +            if (dbConf.getInputQuery() == null) { +                query.append("SELECT "); + +                for (int i = 0; i < fieldNames.length; i++) { +                    query.append(fieldNames[i]); + +                    if (i != fieldNames.length - 1) +                        query.append(", "); +                } + +                query.append(" FROM ").append(tableName); +                query.append(" AS ").append(tableName); //in hsqldb this is necessary + +                if (conditions != null && conditions.length() > 0) +                    query.append(" WHERE (").append(conditions).append(")"); + +                String orderBy = dbConf.getInputOrderBy(); + +                if (orderBy != null && orderBy.length() > 0) +                    query.append(" ORDER BY ").append(orderBy); + +            } +            else +                query.append(dbConf.getInputQuery()); + +            try { +                // Only add limit and offset if you have multiple chunks +                if(split.getChunks() > 1) { +                    query.append(" LIMIT ").append(split.getLength()); +                    query.append(" OFFSET ").append(split.getStart()); +                } +            } catch (IOException ex) { +                //ignore, will not throw +            } + +            return query.toString(); +        } + +        /** {@inheritDoc} */ +        public void close() throws IOException { +            try { +                connection.commit(); +                results.close(); +                statement.close(); +            } catch (SQLException exception) { +                throw new IOException("unable to commit and close", exception); +            } +        } + +        /** {@inheritDoc} */ +        public LongWritable createKey() { +            return new LongWritable(); +        } + +        /** {@inheritDoc} */ +        public T createValue() { +            return ReflectionUtils.newInstance(inputClass, job); +        } + +        /** {@inheritDoc} */ +        public long getPos() throws IOException { +            return pos; +        } + +        /** {@inheritDoc} */ +        public float getProgress() throws IOException { +            return pos / (float) split.getLength(); +        } + +        /** {@inheritDoc} */ +        public boolean next(LongWritable key, T value) throws IOException { +            try { +                if (!results.next()) +                    return false; + +                // Set the key field value as the output key value +                key.set(pos + split.getStart()); + +                value.readFields(results); + +                pos++; +            } catch (SQLException exception) { +                throw new IOException("unable to get next value", exception); +            } + +            return true; +        } +    } + +    /** A Class that does nothing, implementing DBWritable */ +    public static class NullDBWritable implements DBWritable, Writable { + +        public void readFields(DataInput in) throws IOException { +        } + +        public void readFields(ResultSet arg0) throws SQLException { +        } + +        public void write(DataOutput out) throws IOException { +        } + +        public void write(PreparedStatement arg0) throws SQLException { +        } +    } + +    /** A InputSplit that spans a set of rows */ +    protected static class DBInputSplit implements InputSplit { +        private long end = 0; +        private long start = 0; +        private long chunks = 0; + +        /** Default Constructor */ +        public DBInputSplit() { +        } + +        /** +         * Convenience Constructor +         * +         * @param start the index of the first row to select +         * @param end   the index of the last row to select +         */ +        public DBInputSplit(long start, long end, long chunks) { +            this.start = start; +            this.end = end; +            this.chunks = chunks; +            LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks); +        } + +        /** {@inheritDoc} */ +        public String[] getLocations() throws IOException { +            // TODO Add a layer to enable SQL "sharding" and support locality +            return new String[]{}; +        } + +        /** @return The index of the first row to select */ +        public long getStart() { +            return start; +        } + +        /** @return The index of the last row to select */ +        public long getEnd() { +            return end; +        } + +        /** @return The total row count in this split */ +        public long getLength() throws IOException { +            return end - start; +        } + +        /** @return The total number of chucks accross all splits */ +        public long getChunks() { +            return chunks; +        } + +        /** {@inheritDoc} */ +        public void readFields(DataInput input) throws IOException { +            start = input.readLong(); +            end = input.readLong(); +            chunks = input.readLong(); +        } + +        /** {@inheritDoc} */ +        public void write(DataOutput output) throws IOException { +            output.writeLong(start); +            output.writeLong(end); +            output.writeLong(chunks); +        } +    } + +    protected DBConfiguration dbConf; +    protected Connection connection; + +    protected String tableName; +    protected String[] fieldNames; +    protected String conditions; +    protected long limit; +    protected int maxConcurrentReads; + + +    /** {@inheritDoc} */ +    public void configure(JobConf job) { +        dbConf = new DBConfiguration(job); + +        tableName = dbConf.getInputTableName(); +        fieldNames = dbConf.getInputFieldNames(); +        conditions = dbConf.getInputConditions(); +        limit = dbConf.getInputLimit(); +        maxConcurrentReads = dbConf.getMaxConcurrentReadsNum(); + +        try { +            connection = dbConf.getConnection(); +        } catch (IOException exception) { +            throw new RuntimeException("unable to create connection", exception.getCause()); +        } + +        configureConnection(connection); +    } + +    protected void configureConnection(Connection connection) { +        setTransactionIsolationLevel(connection); +        setAutoCommit(connection); +    } + +    protected void setAutoCommit(Connection connection) { +        try { +            connection.setAutoCommit(false); +        } catch (Exception exception) { +            throw new RuntimeException("unable to set auto commit", exception); +        } +    } + +    protected void setTransactionIsolationLevel(Connection connection) { +        try { +            connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); +        } catch (SQLException exception) { +            throw new RuntimeException("unable to configure transaction isolation level", exception); +        } +    } + +    /** {@inheritDoc} */ +    @SuppressWarnings("unchecked") +    public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job, +        Reporter reporter) throws IOException { +        Class inputClass = dbConf.getInputClass(); +        try { +            return new DBRecordReader((DBInputSplit) split, inputClass, job); +        } catch (SQLException exception) { +            throw new IOException(exception.getMessage(), exception); +        } +    } + +    /** {@inheritDoc} */ +    public InputSplit[] getSplits(JobConf job, int chunks) throws IOException { +        // use the configured value if avail +        chunks = maxConcurrentReads == 0 ? chunks : maxConcurrentReads; + +        try { +            Statement statement = connection.createStatement(); + +            ResultSet results = statement.executeQuery(getCountQuery()); + +            long count = 0; + +            while (results.next()) +                count += results.getLong(1); + +            if (limit != -1) +                count = Math.min(limit, count); + +            long chunkSize = (count / chunks); + +            results.close(); +            statement.close(); + +            InputSplit[] splits = new InputSplit[chunks]; + +            // Split the rows into n-number of chunks and adjust the last chunk +            // accordingly +            for (int i = 0; i < chunks; i++) { +                DBInputSplit split; + +                if (i + 1 == chunks) +                    split = new DBInputSplit(i * chunkSize, count, chunks); +                else +                    split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks); + +                splits[i] = split; +            } + +            return splits; +        } catch (SQLException e) { +            throw new IOException(e.getMessage()); +        } +    } + +    /** +     * Returns the query for getting the total number of rows, subclasses can override this for +     * custom behaviour. +     */ +    protected String getCountQuery() { + +        if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); } + +        StringBuilder query = new StringBuilder(); + +        query.append("SELECT COUNT(*) FROM " + tableName); + +        if (conditions != null && conditions.length() > 0) +            query.append(" WHERE " + conditions); + +        return query.toString(); +    } + +    /** +     * Initializes the map-part of the job with the appropriate input settings. +     * +     * @param job             The job +     * @param inputClass      the class object implementing DBWritable, which is the Java object +     *                        holding tuple fields. +     * @param tableName       The table to read data from +     * @param conditions      The condition which to select data with, eg. '(updated > 20070101 AND +     *                        length > 0)' +     * @param orderBy         the fieldNames in the orderBy clause. +     * @param limit +     * @param fieldNames      The field names in the table +     * @param concurrentReads +     */ +    public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, +        String tableName, String conditions, String orderBy, long limit, int concurrentReads, +        String... fieldNames) { +        job.setInputFormat(DBInputFormat.class); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setInputClass(inputClass); +        dbConf.setInputTableName(tableName); +        dbConf.setInputFieldNames(fieldNames); +        dbConf.setInputConditions(conditions); +        dbConf.setInputOrderBy(orderBy); + +        if (limit != -1) +            dbConf.setInputLimit(limit); + +        dbConf.setMaxConcurrentReadsNum(concurrentReads); +    } + +    /** +     * Initializes the map-part of the job with the appropriate input settings. +     * +     * @param job             The job +     * @param inputClass      the class object implementing DBWritable, which is the Java object +     *                        holding tuple fields. +     * @param selectQuery     the input query to select fields. Example : "SELECT f1, f2, f3 FROM +     *                        Mytable ORDER BY f1" +     * @param countQuery      the input query that returns the number of records in the table. +     *                        Example : "SELECT COUNT(f1) FROM Mytable" +     * @param concurrentReads +     */ +    public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, +        String selectQuery, String countQuery, long limit, int concurrentReads) { +        job.setInputFormat(DBInputFormat.class); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setInputClass(inputClass); +        dbConf.setInputQuery(selectQuery); +        dbConf.setInputCountQuery(countQuery); + +        if (limit != -1) +            dbConf.setInputLimit(limit); + +        dbConf.setMaxConcurrentReadsNum(concurrentReads); +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java new file mode 100644 index 0000000..1166970 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts + * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} + * writes <b>only the key</b> to the database with a batch SQL query. + */ +public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { +    private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + +    /** A RecordWriter that writes the reduce output to a SQL table */ +    protected class DBRecordWriter implements RecordWriter<K, V> { +        private Connection connection; +        private PreparedStatement insertStatement; +        private PreparedStatement updateStatement; +        private final int statementsBeforeExecute; + +        private long statementsAdded = 0; +        private long insertStatementsCurrent = 0; +        private long updateStatementsCurrent = 0; + +        protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, +            PreparedStatement updateStatement, int statementsBeforeExecute) { +            this.connection = connection; +            this.insertStatement = insertStatement; +            this.updateStatement = updateStatement; +            this.statementsBeforeExecute = statementsBeforeExecute; +        } + +        /** {@inheritDoc} */ +        public void close(Reporter reporter) throws IOException { +            executeBatch(); + +            try { +                if (insertStatement != null) { insertStatement.close(); } + +                if (updateStatement != null) { updateStatement.close(); } + +                connection.commit(); +            } catch (SQLException exception) { +                rollBack(); + +                createThrowMessage("unable to commit batch", 0, exception); +            } finally { +                try { +                    connection.close(); +                } catch (SQLException exception) { +                    throw new IOException("unable to close connection", exception); +                } +            } +        } + +        private void executeBatch() throws IOException { +            try { +                if (insertStatementsCurrent != 0) { +                    LOG.info( +                        "executing insert batch " + createBatchMessage(insertStatementsCurrent)); + +                    insertStatement.executeBatch(); +                } + +                insertStatementsCurrent = 0; +            } catch (SQLException exception) { +                rollBack(); + +                createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); +            } + +            try { +                if (updateStatementsCurrent != 0) { +                    LOG.info( +                        "executing update batch " + createBatchMessage(updateStatementsCurrent)); + +                    int[] result = updateStatement.executeBatch(); + +                    int count = 0; + +                    for (int value : result) { count += value; } + +                    if (count != updateStatementsCurrent) { +                        throw new IOException( +                            "update did not update same number of statements executed in batch, batch: " +                            + updateStatementsCurrent + " updated: " + count); +                    } +                } + +                updateStatementsCurrent = 0; +            } catch (SQLException exception) { +            	 +            	String message = exception.getMessage(); +            	if (message.indexOf("Duplicate Key") >= 0) { +            		LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); +            	} else { +                    rollBack(); + +                    createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); +            	} +            } +        } + +        private void rollBack() { +            try { +                connection.rollback(); +            } catch (SQLException sqlException) { +                LOG.warn(StringUtils.stringifyException(sqlException)); +            } +        } + +        private String createBatchMessage(long currentStatements) { +            return String +                .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); +        } + +        private void createThrowMessage(String stateMessage, long currentStatements, +            SQLException exception) throws IOException { +            String message = exception.getMessage(); + +            message = message.substring(0, Math.min(75, message.length())); + +            int messageLength = exception.getMessage().length(); +            String batchMessage = createBatchMessage(currentStatements); +            String template = "%s [msglength: %d]%s %s"; +            String errorMessage = +                String.format(template, stateMessage, messageLength, batchMessage, message); + +            LOG.error(errorMessage, exception.getNextException()); + +            throw new IOException(errorMessage, exception.getNextException()); +        } + +        /** {@inheritDoc} */ +        public synchronized void write(K key, V value) throws IOException { +            try { +                if (value == null) { +                    key.write(insertStatement); +                    insertStatement.addBatch(); +                    insertStatementsCurrent++; +                } else { +                    key.write(updateStatement); +                    updateStatement.addBatch(); +                    updateStatementsCurrent++; +                } +            } catch (SQLException exception) { +                throw new IOException("unable to add batch statement", exception); +            } + +            statementsAdded++; + +            if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } +        } +    } + +    /** +     * Constructs the query used as the prepared statement to insert data. +     * +     * @param table      the table to insert into +     * @param fieldNames the fields to insert into. If field names are unknown, supply an array of +     *                   nulls. +     */ +    protected String constructInsertQuery(String table, String[] fieldNames) { +        if (fieldNames == null) { +            throw new IllegalArgumentException("Field names may not be null"); +        } + +        StringBuilder query = new StringBuilder(); + +        query.append("INSERT INTO ").append(table); + +        if (fieldNames.length > 0 && fieldNames[0] != null) { +            query.append(" ("); + +            for (int i = 0; i < fieldNames.length; i++) { +                query.append(fieldNames[i]); + +                if (i != fieldNames.length - 1) { query.append(","); } +            } + +            query.append(")"); + +        } + +        query.append(" VALUES ("); + +        for (int i = 0; i < fieldNames.length; i++) { +            query.append("?"); + +            if (i != fieldNames.length - 1) { query.append(","); } +        } + +        query.append(")"); +         +        boolean test = true; +        if (test) { +        	query.append(" ON DUPLICATE KEY UPDATE "); +        	 +        	 +            for (int i = 1; i < fieldNames.length; i++) { + +                 +                if ( (i != 1) ) { query.append(","); } +                //if (i != fieldNames.length - 1) { query.append(","); } +                //&& (i != fieldNames.length - 1) +                query.append(fieldNames[i]); +                query.append(" = ?"); +                 +                 +            } +        } +         +        query.append(";"); +         +        LOG.info(" ===================== " + query.toString()); +        return query.toString(); +    } + +    protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { +        if (fieldNames == null) { +            throw new IllegalArgumentException("field names may not be null"); +        } + +        Set<String> updateNamesSet = new HashSet<String>(); +        Collections.addAll(updateNamesSet, updateNames); + +        StringBuilder query = new StringBuilder(); + +        query.append("UPDATE ").append(table); + +        query.append(" SET "); + +        if (fieldNames.length > 0 && fieldNames[0] != null) { +            int count = 0; + +            for (int i = 0; i < fieldNames.length; i++) { +                if (updateNamesSet.contains(fieldNames[i])) { continue; } + +                if (count != 0) { query.append(","); } + +                query.append(fieldNames[i]); +                query.append(" = ?"); + +                count++; +            } +        } + +        query.append(" WHERE "); + +        if (updateNames.length > 0 && updateNames[0] != null) { +            for (int i = 0; i < updateNames.length; i++) { +                query.append(updateNames[i]); +                query.append(" = ?"); + +                if (i != updateNames.length - 1) { query.append(" and "); } +            } +        } + +        query.append(";"); +        System.out.println("Update Query => " + query.toString()); +        return query.toString(); +    } + +    /** {@inheritDoc} */ +    public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { +    } + +    /** {@inheritDoc} */ +    public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, +        Progressable progress) throws IOException { +        DBConfiguration dbConf = new DBConfiguration(job); + +        String tableName = dbConf.getOutputTableName(); +        String[] fieldNames = dbConf.getOutputFieldNames(); +        String[] updateNames = dbConf.getOutputUpdateFieldNames(); +        int batchStatements = dbConf.getBatchStatementsNum(); + +        Connection connection = dbConf.getConnection(); + +        configureConnection(connection); + +        String sqlInsert = constructInsertQuery(tableName, fieldNames); +        PreparedStatement insertPreparedStatement; + +        try { +            insertPreparedStatement = connection.prepareStatement(sqlInsert); +            insertPreparedStatement.setEscapeProcessing(true); // should be on by default +        } catch (SQLException exception) { +            throw new IOException("unable to create statement for: " + sqlInsert, exception); +        } + +        String sqlUpdate = +            updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; +        PreparedStatement updatePreparedStatement = null; + +        try { +            updatePreparedStatement = +                sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; +        } catch (SQLException exception) { +            throw new IOException("unable to create statement for: " + sqlUpdate, exception); +        } + +        return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); +    } + +    protected void configureConnection(Connection connection) { +        setAutoCommit(connection); +    } + +    protected void setAutoCommit(Connection connection) { +        try { +            connection.setAutoCommit(false); +        } catch (Exception exception) { +            throw new RuntimeException("unable to set auto commit", exception); +        } +    } + +    /** +     * Initializes the reduce-part of the job with the appropriate output settings +     * +     * @param job                 The job +     * @param dbOutputFormatClass +     * @param tableName           The table to insert data into +     * @param fieldNames          The field names in the table. If unknown, supply the appropriate +     */ +    public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, +        String tableName, String[] fieldNames, String[] updateFields, int batchSize) { +        if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { +            job.setOutputFormat(dbOutputFormatClass); +        } + +        // writing doesn't always happen in reduce +        job.setReduceSpeculativeExecution(false); +        job.setMapSpeculativeExecution(false); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setOutputTableName(tableName); +        dbConf.setOutputFieldNames(fieldNames); + +        if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } + +        if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java new file mode 100644 index 0000000..1369c74 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.Writable; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Objects that are read from/written to a database should implement <code>DBWritable</code>. + * DBWritable, is similar to {@link Writable} except that the {@link #write(PreparedStatement)} + * method takes a {@link PreparedStatement}, and {@link #readFields(ResultSet)} takes a {@link + * ResultSet}. <p> Implementations are responsible for writing the fields of the object to + * PreparedStatement, and reading the fields of the object from the ResultSet. <p/> <p>Example:</p> + * If we have the following table in the database : + * <pre> + * CREATE TABLE MyTable ( + *   counter        INTEGER NOT NULL, + *   timestamp      BIGINT  NOT NULL, + * ); + * </pre> + * then we can read/write the tuples from/to the table with : + * <p><pre> + * public class MyWritable implements Writable, DBWritable { + *   // Some data + *   private int counter; + *   private long timestamp; + * <p/> + *   //Writable#write() implementation + *   public void write(DataOutput out) throws IOException { + *     out.writeInt(counter); + *     out.writeLong(timestamp); + *   } + * <p/> + *   //Writable#readFields() implementation + *   public void readFields(DataInput in) throws IOException { + *     counter = in.readInt(); + *     timestamp = in.readLong(); + *   } + * <p/> + *   public void write(PreparedStatement statement) throws SQLException { + *     statement.setInt(1, counter); + *     statement.setLong(2, timestamp); + *   } + * <p/> + *   public void readFields(ResultSet resultSet) throws SQLException { + *     counter = resultSet.getInt(1); + *     timestamp = resultSet.getLong(2); + *   } + * } + * </pre></p> + */ +public interface DBWritable { + +    /** +     * Sets the fields of the object in the {@link PreparedStatement}. +     * +     * @param statement the statement that the fields are put into. +     * @throws SQLException +     */ +    public void write(PreparedStatement statement) throws SQLException; + +    /** +     * Reads the fields of the object from the {@link ResultSet}. +     * +     * @param resultSet the {@link ResultSet} to get the fields from. +     * @throws SQLException +     */ +    public void readFields(ResultSet resultSet) throws SQLException; + +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala new file mode 100644 index 0000000..2a08b7d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -0,0 +1,56 @@ +package parallelai.spyglass.jdbc + +import com.twitter.scalding.AccessMode +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Mode +import com.twitter.scalding.Read +import com.twitter.scalding.Source +import com.twitter.scalding.Write +import cascading.scheme.Scheme +import cascading.tap.Tap +import cascading.tuple.Fields +import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.mapred.OutputCollector +import org.apache.hadoop.mapred.JobConf + +class JDBCSource( +    tableName: String = "tableName", +    driverName: String = "com.mysql.jdbc.Driver", +    connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>", +    userId: String = "user", +    password: String = "password", +    columnNames: Array[String] = Array[String]("col1", "col2", "col3"), +    columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"), +    primaryKeys: Array[String] = Array[String]("primary_key"), +    fields: Fields = new Fields("fld1", "fld2", "fld3"), +    orderBy: Array[String] = null, +    updateBy: Array[String] = null, +    updateByFields: Fields = null +  ) extends Source { + +  override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy) +    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +    val jdbcScheme = hdfsScheme match { +      case jdbc: JDBCScheme => jdbc +      case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme") +    } +    mode match { +      case hdfsMode @ Hdfs(_, _) => readOrWrite match { +        case Read => { +          val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) +          val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) +          jdbcTap.asInstanceOf[Tap[_,_,_]] +        } +        case Write => { + +          val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) +          val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) +          jdbcTap.asInstanceOf[Tap[_,_,_]] +        } +      } +      case _ => super.createTap(readOrWrite)(mode) +    } +  } +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala new file mode 100644 index 0000000..1544f47 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala @@ -0,0 +1,57 @@ +package parallelai.spyglass.jdbc.testing + +import com.twitter.scalding.TextLine +import com.twitter.scalding.Args +import com.twitter.scalding.Tsv +import com.twitter.scalding.mathematics.Matrix._ +import scala.math._ +import scala.math.BigDecimal.javaBigDecimal2bigDecimal +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding.Osv +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +class HdfsToJdbc (args: Args) extends JobBase(args) {  + +	implicit val implicitArgs: Args = args +   +    val scaldingInputPath = getString("input.scalding")   +    log.info("Scalding sample input path => [" + scaldingInputPath + "]") +   +    val S_output = scaldingInputPath +    val fileType = getString("fileType") +    log.info("Input file type => " + fileType) +   +    val S_SCHEMA = List( +		  'key_id,    'col1, 'col2, 'col3 +    )   +    	 +	val url = "mysql01.prod.bigdata.bskyb.com" +	val dbName = "skybet_db"  +	val tableName = "skybet_hbase_betdetail_jdbc_test" +	 +	   +	val jdbcSource2 = new JDBCSource( +		"db_name", +		"com.mysql.jdbc.Driver", +		"jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull", +		"user", +		"password", +		Array[String]("KEY_ID", "COL1", "COL2", "COL3"), +		Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,  "bigint(20)"), +		Array[String]("key_id"), +		new Fields("key_id",    "col1",    "col2",    "col3") +	) +   +    var piper:Pipe = null +    if (fileType equals("Tsv")) +    	piper = Tsv(S_output, S_SCHEMA).read +    else +    	piper = Osv(S_output, S_SCHEMA).read +	    +	val S_FLOW =  +    	Tsv(S_output, S_SCHEMA).read +    	.write(jdbcSource2) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala new file mode 100644 index 0000000..30c03a2 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala @@ -0,0 +1,200 @@ +package parallelai.spyglass.jdbc.testing + +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.log4j.Logger +import com.twitter.scalding.Args +import com.twitter.scalding.IterableSource +import com.twitter.scalding.Tsv +import cascading.pipe.Pipe +import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +/** + * This integration-test expects some Jdbc table to exist + * with specific data - see GenerateTestingHTables.java + */ + +// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala +class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) { +   +  // Initiate logger +  private val LOG: Logger = LogManager.getLogger(this.getClass) +   +  // Set to Level.DEBUG if --debug is passed in +  val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean +  if (isDebug) {  +    LOG.setLevel(Level.DEBUG) +    LOG.info("Setting logging to Level.DEBUG") +  } +   +  val url = "mysql01.prod.bigdata.bskyb.com" +  val dbName = "skybet_db"  +  val tableName = "skybet_hbase_betdetail_jdbc_test" + +  val jdbcSourceRead = new JDBCSource( +	"TABLE_01", +	"com.mysql.jdbc.Driver", +	"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +	"root", +	"password", +	Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +	Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,    "bigint(20)"), +	Array[String]("id"), +	new Fields("key",    "column1",    "column2",    "column3"), +	null, null, null +  ) +   +  val jdbcSourceWrite = new JDBCSource( +	"TABLE_01", +	"com.mysql.jdbc.Driver", +	"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +	"root", +	"password", +	Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +	Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,    "bigint(20)"), +	Array[String]("id"), +	new Fields("key",    "column1",    "column2",    "column3"), +	null, null, null +  )   +     +  // ----------------------------- +  // ----- Tests for TABLE_01 ---- +  // ----------------------------- +  val TABLE_01_SCHEMA = List('key,'column1, 'column2, 'column3) +  val tableName1 = "TABLE_01" + +  // -------------------- Test 01 -------------------- +  var testName01 = "Select_Test_Read_Count" +  println("---- Running : " + testName01) +  // Get everything from HBase testing table into a Pipe +  val jdbc01 = jdbcSourceRead +  		  	.read  	     +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  	   group.toList[String]('column2 -> 'column2) +  	   group.toList[String]('column3 -> 'column3) +  } +  .mapTo(('key, 'column1,  'column2,  'column3) -> 'jdbcdata) { x:(String,String,String,String) => +	     x._1 + " " + x._2 + " " + x._3 + " " + x._4 +  } +   +  // Calculate expected result for Test 01 +  var list01 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"),  ("3", "C", "Z", "345")) +  +  // -------------------- Test 02 -------------------- +  val testName02 = "Select_Test_Read_Insert_Updated_Count" +  println("---- Running : " + testName02) +   +  // Get everything from JDBC testing table into a Pipe +   +  val jdbcSourceReadUpdated = new JDBCSource( +	"TABLE_02", +	"com.mysql.jdbc.Driver", +	"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +	"root", +	"password", +	Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +	Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,    "bigint(20)"), +	Array[String]("id"), +	new Fields("key",    "column1",    "column2",    "column3"), +	null, null, null +  )   +   +  val jdbc02 = jdbcSourceReadUpdated +  .read  	     +  .groupAll { group =>  +  	   group.toList[String]('key -> 'key) +  	   group.toList[String]('column1 -> 'column1) +  	   group.toList[String]('column2 -> 'column2) +  	   group.toList[String]('column3 -> 'column3) +  } +  .mapTo(('key, 'column1,  'column2,  'column3) -> 'jdbcdata) { x:(String,String,String,String) => +	     x._1 + " " + x._2 + " " + x._3 + " " + x._4 +  } +   +  // Calculate expected result for Test 02 +  var list02 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"),  ("3", "C", "Z", "345")) +   +  // Store results of Scan Test 01 +  ( +    getTestResultPipe(getExpectedPipe(list01), jdbc01, testName01) ++ +    getTestResultPipe(getExpectedPipe(list02), jdbc02, testName02) +  ).groupAll { group => +    group.sortBy('testName) +  } +  .write(Tsv("JdbcShouldRead")) +   +   +  /** +   * We assume the pipe is empty +   *  +   * We concatenate with a header - if the resulting size is 1  +   * then the original size was 0 - then the pipe was empty :) +   *  +   * The result is then returned in a Pipe +   */ +  def assertPipeIsEmpty ( jdbcPipe : Pipe , testName:String) : Pipe = {  +    val headerPipe = IterableSource(List(testName), 'jdbcdata) +    val concatenation = ( jdbcPipe ++ headerPipe ).groupAll{ group => +      group.size('size) +    } +    .project('size) +     +    val result =  +      concatenation +      .mapTo('size -> ('testName, 'result, 'expecteddata, 'jdbcdata)) { x:String => {  +	      if (x == "1") {  +	        (testName, "Success", "", "") +	      } else { +	        (testName, "Test Failed", "", "") +	      }   +    	} +      } +     +    result +  } +   +  /** +   * Methods receives 2 pipes - and projects the results of testing +   *  +   * expectedPipe  should have a column 'expecteddata +   * realJdbcPipe should have a column 'jdbcdata +   */ +  def getTestResultPipe ( expectedPipe:Pipe , realJdbcPipe:Pipe, testName: String ): Pipe = { +	val results = expectedPipe.insert('testName , testName) +                 .joinWithTiny('testName -> 'testName, realJdbcPipe.insert('testName , testName)) +    .map(('expecteddata, 'jdbcdata)->'result) { x:(String,String) => +      	//println(x._1 + " === " + x._2) +        if (x._1.equals(x._2))  +           "Success" +        else +           "Test Failed" +    } +    .project('testName, 'result, 'expecteddata, 'jdbcdata) +    results +  } + +  /**  +   *   +   */ +  def getExpectedPipe ( expectedList: List[(String,String,String,String)]) : Pipe = { +     +    val expectedPipe =  +      IterableSource(expectedList, TABLE_01_SCHEMA) +      .groupAll { group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +        group.toList[String]('column2 -> 'column2) +        group.toList[String]('column3 -> 'column3) +         +      } +      .mapTo(('*) -> 'expecteddata) { x:(String,String,String,String) => +         x._1 + " " + x._2 + " " + x._3 + " " + x._4 +      } +   expectedPipe +  } +   +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala new file mode 100644 index 0000000..f317834 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala @@ -0,0 +1,10 @@ +package parallelai.spyglass.jdbc.testing + +import parallelai.spyglass.base.JobRunner + +object JdbcSourceShouldReadWriteRunner extends App { +  val appConfig = "/projects/applications.conf" +  val libPath = "/*.jar" + +  JobRunner.main(Array(classOf[JdbcSourceShouldReadWrite].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath)) +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala new file mode 100644 index 0000000..9fc09e4 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala @@ -0,0 +1,67 @@ +package parallelai.spyglass.jdbc.testing + +import com.twitter.scalding._ +import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +/** + * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match. + * Now hardcoded for Skybet betdetail summation sample. + * To run it: + * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \ + *    --app.conf.path /projects/application-hadoop.conf --hdfs  \ + *    --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation + * @param args + */ +class TablesComparison(args: Args) extends JobBase(args) { + +  implicit val implicitArgs: Args = args +  val conf = appConfig + +  val jdbcSink = new JDBCSource( +    "table_name", +    "com.mysql.jdbc.Driver", +    "jdbc:mysql://<hostname>:<port>/<db_name>", +    "skybet_user", +    "zkb4Uo{C8", +    Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"), +    Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"), +    Array[String]("betleg_id"), +    new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"), +    Array[String]("BETLEG_ID"), +    Array[String]("BETLEG_ID"), +    new Fields("betleg_id") +  ) +    .read +    .insert('name, "betdetail") +    .project('bet_id, 'part_no, 'leg_id) + +  // +  // .write(new TextLine("testJDBCComparator/compare1")) + +  val jdbcSource2 = new JDBCSource( +    "skybet_midas_bet_detail", +    "com.mysql.jdbc.Driver", +    "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull", +    "skybet_user", +    "zkb4Uo{C8", +    Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"), +    Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"), +    Array[String]("bet_id"), +    new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no") + +  ) +    .read +    .insert('name, "sample") +    .project('bet_id, 'part_no, 'leg_id) + +  val uPipe = jdbcSink ++ jdbcSource2 +  uPipe +    .groupBy('bet_id, 'part_no, 'leg_id) { +    _.size +  }.filter('size) { +    x: Int => x != 2 +  } +    .write(new TextLine("testJDBCComparator/result")) +}
\ No newline at end of file diff --git a/src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java b/src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java new file mode 100644 index 0000000..54ec8fc --- /dev/null +++ b/src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java @@ -0,0 +1,201 @@ +package parallelai.spyglass.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +/** + * Class generates TWO tables in database 'TABLE_01' and 'TABLE_02' + *  + * Those tables are used by the 'integration-testing' of JDBCSource in file + * JdbcSourceShouldReadWrite.scala + *  + * Run with: mvn -Dtestparallelai.spyglass.jdbc.GenerateTestingTables test + *  + */ +public class GenerateTestingTables { + +	// JDBC driver name and database URL +	static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; +	static final String DB_PORT = "3306"; +	static final String DB_NAME = "database_name"; + +	 +	static final String DB_URL = "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull"; + +	// Database credentials +	static final String USER = "user"; +	static final String PASS = "password"; + +	public static enum TestingTable { +		TABLE_01, TABLE_02 +	} + +	private static final Log LOG = LogFactory +			.getLog(GenerateTestingTables.class); + +	@Test +	public void fakeTest() { + +		// Connect to Quorum +		LOG.info("Connecting to " + DB_URL + ":" + DB_PORT); + +		Connection conn = null; +		Statement stmt = null; +		try { +			// STEP 2: Register JDBC driver +			Class.forName("com.mysql.jdbc.Driver"); + +			// STEP 3: Open a connection +			LOG.info("Connecting to a selected database..."); +			conn = DriverManager.getConnection(DB_URL, USER, PASS); +			LOG.info("Connected database successfully..."); + +			 +			// Delete test tables +			deleteTestTable(conn, TestingTable.TABLE_01.name()); +			deleteTestTable(conn, TestingTable.TABLE_02.name()); + +			// Generate test tables +			createTestTable(conn, TestingTable.TABLE_01); +			createTestTable(conn, TestingTable.TABLE_02); + +			// Populate test tables +			populateTestTable(conn, TestingTable.TABLE_01); +			//populateTestTable(conn, TestingTable.TABLE_02); + +			// Print content of test table +			printHTable(conn, TestingTable.TABLE_01); + +			// If we've reached here - the testing data are in +			Assert.assertEquals("true", "true");			 +			 +			 +		} catch (SQLException se) { +			// Handle errors for JDBC +			se.printStackTrace(); +			LOG.error(se.toString()); +		} catch (Exception e) { +			// Handle errors for Class.forName +			e.printStackTrace(); +			LOG.error(e.toString()); +		} finally { +			// finally block used to close resources +			try { +				if (stmt != null) +					conn.close(); +			} catch (SQLException se) { +			}// do nothing +			try { +				if (conn != null) +					conn.close(); +			} catch (SQLException se) { +				se.printStackTrace(); +				LOG.error(se.toString()); +			}// end finally try +		}// end try + +	} + +	private static void populateTestTable(Connection connection, TestingTable testingTable) +			throws SQLException { +		 +		 +		// Load up table +		LOG.info("Populating table in given database..."); +		Statement stmt = connection.createStatement(); +		 +		 +		String [] queries = { +			    "insert into " + testingTable.name() + " values (1, 'A', 'X', 123)", +			    "insert into " + testingTable.name() + " values (2, 'B', 'Y', 234)", +			    "insert into " + testingTable.name() + " values (3, 'C', 'Z', 345)", +			}; +			              +		Statement statement = connection.createStatement(); +		              +		for (String query : queries) { +			statement.addBatch(query); +		} +		statement.executeBatch(); +		LOG.info("Populated table in given database..."); + +		statement.close(); +		 +	} + +	private static void createTestTable(Connection connection, TestingTable testingTable) +			throws SQLException { + +		LOG.info("Creating table in given database..."); +		Statement stmt = connection.createStatement(); + +		String sql = "CREATE TABLE " + testingTable.name() + " " +				+ "(id INTEGER not NULL, " + " test_column1 VARCHAR(255), " +				+ " test_column2 VARCHAR(255), " + " test_column3 INTEGER, " +				+ " PRIMARY KEY ( id ))"; + +		stmt.executeUpdate(sql); +		LOG.info("Created table in given database..."); + +		stmt.close(); +	} + +	/** +	 * Method to disable and delete HBase Tables i.e. "int-test-01" +	 */ +	private static void deleteTestTable(Connection connection, String tableName) throws SQLException { + +		 +		// Execute a query +		LOG.info("Deleting table in given database..."); +		Statement stmt = connection.createStatement(); + +		String sql = "DROP TABLE IF EXISTS " + tableName; + +		int result = stmt.executeUpdate(sql); +		LOG.info("Deleted table in given database... " + result);		 + + +		stmt.close(); +	} + +	/** +	 * Method to print-out an HTable +	 */ +	private static void printHTable(Connection connection, TestingTable testingTable) +			throws SQLException { + +		// Execute a query +		LOG.info("Printing table in given database..."); +		Statement stmt = connection.createStatement(); +		 +		String sql = "SELECT * FROM " + testingTable.name(); + +		ResultSet resultSet = stmt.executeQuery(sql); +		LOG.info("Get data from table in given database...");			 + +		while (resultSet.next()) { +			Integer key = resultSet.getInt("id"); +			String testColumn1 = resultSet.getString("test_column1"); +			String testColumn2 = resultSet.getString("test_column2"); +			Integer testColumn3 = resultSet.getInt("test_column3"); +			 +			LOG.info(key + " : " + testColumn1 + " : " + testColumn2 + " : " + testColumn3); +		} +		 +	} + +	public static void main(String[] args) { +		GenerateTestingTables test = new GenerateTestingTables(); +		test.fakeTest(); +	} +}
\ No newline at end of file | 
