/*
* Copyright (c) 2009 Concurrent, Inc.
*
* This work has been released into the public domain
* by the copyright holder. This applies worldwide.
*
* In case this is not legally possible:
* The copyright holder grants any entity the right
* to use this work for any purpose, without any
* conditions, unless such conditions are required by law.
*/
package parallelai.spyglass.jdbc;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import parallelai.spyglass.jdbc.db.DBInputFormat;
import parallelai.spyglass.jdbc.db.DBOutputFormat;
import java.io.IOException;
import java.util.Arrays;
/**
* Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database.
*
* If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns
* for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated.
*
* Some constructors take columnFields and updateByFields. These values will be used during field name resolution
* to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names
* in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects,
* use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size.
*
* Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database.
*/
public class JDBCScheme extends Scheme
{
private Class extends DBInputFormat> inputFormatClass;
private Class extends DBOutputFormat> outputFormatClass;
private String[] columns;
private String[] orderBy;
private String conditions;
private String[] updateBy;
private Fields updateValueFields;
private Fields updateByFields;
private Fields columnFields;
private Tuple updateIfTuple;
private String selectQuery;
private String countQuery;
private long limit = -1;
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param outputFormatClass of type Class extends DBOutputFormat>
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param limit of type long
* @param updateBy of type String[]
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy )
{
this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param outputFormatClass of type Class extends DBOutputFormat>
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param limit of type long
* @param updateByFields of type Fields
* @param updateBy of type String[]
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy )
{
this.columnFields = columnFields;
verifyColumns( columnFields, columns );
setSinkFields( columnFields );
setSourceFields( columnFields );
if( updateBy != null && updateBy.length != 0 )
{
this.updateBy = updateBy;
this.updateByFields = updateByFields;
if( updateByFields.size() != updateBy.length )
throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" );
if( !this.columnFields.contains( this.updateByFields ) )
throw new IllegalArgumentException( "columnFields must contain updateByFields column names" );
this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields );
this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls
}
this.columns = columns;
this.orderBy = orderBy;
this.conditions = conditions;
this.limit = limit;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
}
private void verifyColumns( Fields columnFields, String[] columns )
{
if( columnFields.size() != columns.length )
throw new IllegalArgumentException( "columnFields and columns must be the same size" );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param outputFormatClass of type Class extends DBOutputFormat>
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param updateBy of type String[]
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy )
{
this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param outputFormatClass of type Class extends DBOutputFormat>
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param updateByFields of type Fields
* @param updateBy of type String[]
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy )
{
this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param outputFormatClass of type Class extends DBOutputFormat>
* @param columns of type String[]
* @param orderBy of type String[]
* @param updateBy of type String[]
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy )
{
this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param outputFormatClass of type Class extends DBOutputFormat>
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
* @param updateByFields of type Fields
* @param updateBy of type String[]
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy )
{
this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String[]
* @param updateBy of type String[]
*/
public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy )
{
this( null, null, columns, orderBy, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
* @param updateByFields of type Fields
* @param updateBy of type String[]
*/
public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy )
{
this( null, null, columnFields, columns, orderBy, updateByFields, updateBy );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param limit of type long
*/
public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit )
{
this( null, null, columns, orderBy, conditions, limit, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param limit of type long
*/
public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit )
{
this( null, null, columnFields, columns, orderBy, conditions, limit, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
*/
public JDBCScheme( String[] columns, String[] orderBy, String conditions )
{
this( null, null, columns, orderBy, conditions, null );
}
public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions )
{
this( null, null, columnFields, columns, orderBy, conditions, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String[]
* @param limit of type long
*/
public JDBCScheme( String[] columns, String[] orderBy, long limit )
{
this( null, null, columns, orderBy, null, limit, null );
}
public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit )
{
this( null, null, columnFields, columns, orderBy, null, limit, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String[]
*/
public JDBCScheme( String[] columns, String[] orderBy )
{
this( null, null, columns, orderBy, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
*/
public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy )
{
this( null, null, columnFields, columns, orderBy, null, -1, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param conditions of type String
* @param limit of type long
*/
public JDBCScheme( String[] columns, String conditions, long limit )
{
this( null, null, columns, null, conditions, limit, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param conditions of type String
* @param limit of type long
*/
public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit )
{
this( null, null, columnFields, columns, null, conditions, limit, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param conditions of type String
*/
public JDBCScheme( String[] columns, String conditions )
{
this( null, null, columns, null, conditions, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param conditions of type String
*/
public JDBCScheme( Fields columnFields, String[] columns, String conditions )
{
this( null, null, columnFields, columns, null, conditions, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param limit of type long
*/
public JDBCScheme( String[] columns, long limit )
{
this( null, null, columns, null, null, limit, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param limit of type long
*/
public JDBCScheme( Fields columnFields, String[] columns, long limit )
{
this( null, null, columnFields, columns, null, null, limit, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
*/
public JDBCScheme( String[] columns )
{
this( null, null, new Fields( columns ), columns, null, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
*/
public JDBCScheme( Fields columnFields, String[] columns )
{
this( null, null, columnFields, columns, null, null, null );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* Use this constructor if the data source may only be used as a source.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param columns of type String[]
* @param selectQuery of type String
* @param countQuery of type String
* @param limit of type long
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit )
{
this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class extends DBInputFormat>
* @param columnFields of type Fields
* @param columns of type String[]
* @param selectQuery of type String
* @param countQuery of type String
* @param limit of type long
*/
public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit )
{
this.columnFields = columnFields;
verifyColumns( columnFields, columns );
setSourceFields( columnFields );
this.columns = columns;
this.selectQuery = selectQuery.trim().replaceAll( ";$", "" );
this.countQuery = countQuery.trim().replaceAll( ";$", "" );
this.limit = limit;
this.inputFormatClass = inputFormatClass;
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* Use this constructor if the data source may only be used as a source.
*
* @param columns of type String[]
* @param selectQuery of type String
* @param countQuery of type String
* @param limit of type long
*/
public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit )
{
this( null, new Fields( columns ), columns, selectQuery, countQuery, limit );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param selectQuery of type String
* @param countQuery of type String
* @param limit of type long
*/
public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit )
{
this( null, columnFields, columns, selectQuery, countQuery, limit );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* Use this constructor if the data source may only be used as a source.
*
* @param columns of type String[]
* @param selectQuery of type String
* @param countQuery of type String
*/
public JDBCScheme( String[] columns, String selectQuery, String countQuery )
{
this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 );
}
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columnFields of type Fields
* @param columns of type String[]
* @param selectQuery of type String
* @param countQuery of type String
*/
public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery )
{
this( null, columnFields, columns, selectQuery, countQuery, -1 );
}
/**
* Method getColumns returns the columns of this JDBCScheme object.
*
* @return the columns (type String[]) of this JDBCScheme object.
*/
public String[] getColumns() {
return columns;
}
/**
* Method getOrderBy returns the orderBy of this JDBCScheme object.
*
* @return the orderBy (type String[]) of this JDBCScheme object.
*/
public String[] getOrderBy() {
return orderBy;
}
@Override
public void sourceConfInit( FlowProcess process, Tap tap,
JobConf conf ) {
int concurrentReads = ( (JDBCTap) tap ).concurrentReads;
if( selectQuery != null )
DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads );
else {
String tableName = ( (JDBCTap) tap ).getTableName();
String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null;
DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns );
}
if( inputFormatClass != null )
conf.setInputFormat( inputFormatClass );
}
@Override
public void sinkConfInit( FlowProcess process, Tap tap,
JobConf conf ) {
if( selectQuery != null )
throw new TapException( "cannot sink to this Scheme" );
String tableName = ( (JDBCTap) tap ).getTableName();
int batchSize = ( (JDBCTap) tap ).getBatchSize();
DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize );
if( outputFormatClass != null )
conf.setOutputFormat( outputFormatClass );
}
@Override
public void sourcePrepare( FlowProcess flowProcess, SourceCall