aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSaad Rashid <saad373@gmail.com>2013-06-24 17:13:19 +0100
committerSaad Rashid <saad373@gmail.com>2013-06-24 17:13:19 +0100
commit3e4a74de169104679f2459947c27d5cfb8cc430b (patch)
tree28e5ba58790e6964f5228382edacc6d459976926 /src
parent056fe03a21f48d57f31a0c11f159874b410bc4e9 (diff)
downloadSpyGlass-3e4a74de169104679f2459947c27d5cfb8cc430b.tar.gz
SpyGlass-3e4a74de169104679f2459947c27d5cfb8cc430b.zip
Added JDBCTap support.
Diffstat (limited to 'src')
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java25
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java670
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/JDBCTap.java621
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java118
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/TableDesc.java181
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/TupleRecord.java63
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java276
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java452
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java391
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java83
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala56
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala57
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala200
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala10
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala67
-rw-r--r--src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java201
16 files changed, 3471 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java
new file mode 100644
index 0000000..6788624
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java
@@ -0,0 +1,25 @@
+package parallelai.spyglass.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class JDBCConstants {
+
+ public enum JdbcSourceMode {
+ SELECT,
+ SELECT_WITH_PARTITIONS,
+ SELECT_WITH_BUCKETS;
+ }
+
+ public enum JdbcSinkMode {
+ INSERT,
+ UPDATE,
+ UPSERT;
+ }
+
+ public static final String START_KEY = "jdbc.%s.startkey";
+ public static final String STOP_KEY = "jdbc.%s.stopkey";
+ public static final String SOURCE_MODE = "jdbc.%s.source.mode";
+ public static final String KEY_LIST = "jdbc.%s.key.list";
+ public static final String VERSIONS = "jdbc.%s.versions";
+
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java
new file mode 100644
index 0000000..5007895
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java
@@ -0,0 +1,670 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import cascading.util.Util;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import parallelai.spyglass.jdbc.db.DBInputFormat;
+import parallelai.spyglass.jdbc.db.DBOutputFormat;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database.
+ * <p/>
+ * If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns
+ * for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated.
+ * <p/>
+ * Some constructors take columnFields and updateByFields. These values will be used during field name resolution
+ * to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names
+ * in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects,
+ * use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size.
+ * <p/>
+ * Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database.
+ */
+public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>
+{
+ private Class<? extends DBInputFormat> inputFormatClass;
+ private Class<? extends DBOutputFormat> outputFormatClass;
+ private String[] columns;
+ private String[] orderBy;
+ private String conditions;
+ private String[] updateBy;
+ private Fields updateValueFields;
+ private Fields updateByFields;
+ private Fields columnFields;
+ private Tuple updateIfTuple;
+ private String selectQuery;
+ private String countQuery;
+ private long limit = -1;
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param outputFormatClass of type Class<? extends DBOutputFormat>
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param outputFormatClass of type Class<? extends DBOutputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy )
+ {
+ this.columnFields = columnFields;
+
+ verifyColumns( columnFields, columns );
+
+ setSinkFields( columnFields );
+ setSourceFields( columnFields );
+
+ if( updateBy != null && updateBy.length != 0 )
+ {
+ this.updateBy = updateBy;
+ this.updateByFields = updateByFields;
+
+ if( updateByFields.size() != updateBy.length )
+ throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" );
+
+ if( !this.columnFields.contains( this.updateByFields ) )
+ throw new IllegalArgumentException( "columnFields must contain updateByFields column names" );
+
+ this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields );
+ this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls
+ }
+
+ this.columns = columns;
+ this.orderBy = orderBy;
+ this.conditions = conditions;
+ this.limit = limit;
+
+ this.inputFormatClass = inputFormatClass;
+ this.outputFormatClass = outputFormatClass;
+ }
+
+ private void verifyColumns( Fields columnFields, String[] columns )
+ {
+ if( columnFields.size() != columns.length )
+ throw new IllegalArgumentException( "columnFields and columns must be the same size" );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param outputFormatClass of type Class<? extends DBOutputFormat>
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param outputFormatClass of type Class<? extends DBOutputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param outputFormatClass of type Class<? extends DBOutputFormat>
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param outputFormatClass of type Class<? extends DBOutputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy )
+ {
+ this( null, null, columns, orderBy, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy )
+ {
+ this( null, null, columnFields, columns, orderBy, updateByFields, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit )
+ {
+ this( null, null, columns, orderBy, conditions, limit, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit )
+ {
+ this( null, null, columnFields, columns, orderBy, conditions, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, String conditions )
+ {
+ this( null, null, columns, orderBy, conditions, null );
+ }
+
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions )
+ {
+ this( null, null, columnFields, columns, orderBy, conditions, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, long limit )
+ {
+ this( null, null, columns, orderBy, null, limit, null );
+ }
+
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit )
+ {
+ this( null, null, columnFields, columns, orderBy, null, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ */
+ public JDBCScheme( String[] columns, String[] orderBy )
+ {
+ this( null, null, columns, orderBy, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy )
+ {
+ this( null, null, columnFields, columns, orderBy, null, -1, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String conditions, long limit )
+ {
+ this( null, null, columns, null, conditions, limit, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit )
+ {
+ this( null, null, columnFields, columns, null, conditions, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param conditions of type String
+ */
+ public JDBCScheme( String[] columns, String conditions )
+ {
+ this( null, null, columns, null, conditions, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param conditions of type String
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String conditions )
+ {
+ this( null, null, columnFields, columns, null, conditions, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, long limit )
+ {
+ this( null, null, columns, null, null, limit, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, long limit )
+ {
+ this( null, null, columnFields, columns, null, null, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ */
+ public JDBCScheme( String[] columns )
+ {
+ this( null, null, new Fields( columns ), columns, null, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ */
+ public JDBCScheme( Fields columnFields, String[] columns )
+ {
+ this( null, null, columnFields, columns, null, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ * <p/>
+ * Use this constructor if the data source may only be used as a source.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class<? extends DBInputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this.columnFields = columnFields;
+
+ verifyColumns( columnFields, columns );
+
+ setSourceFields( columnFields );
+
+ this.columns = columns;
+ this.selectQuery = selectQuery.trim().replaceAll( ";$", "" );
+ this.countQuery = countQuery.trim().replaceAll( ";$", "" );
+ this.limit = limit;
+
+ this.inputFormatClass = inputFormatClass;
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ * <p/>
+ * Use this constructor if the data source may only be used as a source.
+ *
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this( null, new Fields( columns ), columns, selectQuery, countQuery, limit );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this( null, columnFields, columns, selectQuery, countQuery, limit );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ * <p/>
+ * Use this constructor if the data source may only be used as a source.
+ *
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ */
+ public JDBCScheme( String[] columns, String selectQuery, String countQuery )
+ {
+ this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery )
+ {
+ this( null, columnFields, columns, selectQuery, countQuery, -1 );
+ }
+
+ /**
+ * Method getColumns returns the columns of this JDBCScheme object.
+ *
+ * @return the columns (type String[]) of this JDBCScheme object.
+ */
+ public String[] getColumns() {
+ return columns;
+ }
+
+ /**
+ * Method getOrderBy returns the orderBy of this JDBCScheme object.
+ *
+ * @return the orderBy (type String[]) of this JDBCScheme object.
+ */
+ public String[] getOrderBy() {
+ return orderBy;
+ }
+
+ @Override
+ public void sourceConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
+ JobConf conf ) {
+ int concurrentReads = ( (JDBCTap) tap ).concurrentReads;
+
+ if( selectQuery != null )
+ DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads );
+ else {
+ String tableName = ( (JDBCTap) tap ).getTableName();
+ String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null;
+ DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns );
+ }
+
+ if( inputFormatClass != null )
+ conf.setInputFormat( inputFormatClass );
+ }
+
+ @Override
+ public void sinkConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
+ JobConf conf ) {
+ if( selectQuery != null )
+ throw new TapException( "cannot sink to this Scheme" );
+
+ String tableName = ( (JDBCTap) tap ).getTableName();
+ int batchSize = ( (JDBCTap) tap ).getBatchSize();
+ DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize );
+
+ if( outputFormatClass != null )
+ conf.setOutputFormat( outputFormatClass );
+ }
+
+ @Override
+ public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
+ {
+ Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()};
+
+ sourceCall.setContext( pair );
+ }
+
+ @Override
+ public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
+ {
+ Object key = sourceCall.getContext()[ 0 ];
+ Object value = sourceCall.getContext()[ 1 ];
+ boolean result = sourceCall.getInput().next( key, value );
+
+ if( !result )
+ return false;
+
+ Tuple newTuple = ( (TupleRecord) value ).getTuple();
+ sourceCall.getIncomingEntry().setTuple( newTuple );
+
+ return true;
+ }
+
+ @Override
+ public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) {
+ sourceCall.setContext( null );
+ }
+
+ @Override
+ public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException {
+ // it's ok to use NULL here so the collector does not write anything
+ TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
+ OutputCollector outputCollector = sinkCall.getOutput();
+ if( updateBy != null )
+ {
+ Tuple allValues = tupleEntry.selectTuple( updateValueFields );
+ Tuple updateValues = tupleEntry.selectTuple( updateByFields );
+
+ allValues = cleanTuple( allValues );
+
+ TupleRecord key = new TupleRecord( allValues );
+
+ if( updateValues.equals( updateIfTuple ) )
+ outputCollector.collect( key, null );
+ else
+ outputCollector.collect( key, key );
+
+ return;
+ }
+
+ Tuple result = tupleEntry.selectTuple( getSinkFields() );
+
+ result = cleanTuple( result );
+
+ outputCollector.collect( new TupleRecord( result ), null );
+ }
+
+ /**
+ * Provides a hook for subclasses to escape or modify any values before creating the final SQL statement.
+ *
+ * @param result
+ * @return
+ */
+ protected Tuple cleanTuple( Tuple result ) {
+ return result;
+ }
+
+ @Override
+ public boolean equals( Object object ) {
+ if( this == object )
+ return true;
+ if( !( object instanceof JDBCScheme ) )
+ return false;
+ if( !super.equals( object ) )
+ return false;
+
+ JDBCScheme that = (JDBCScheme) object;
+
+ if( limit != that.limit )
+ return false;
+ if( columnFields != null ? !columnFields.equals( that.columnFields ) : that.columnFields != null )
+ return false;
+ if( !Arrays.equals( columns, that.columns ) )
+ return false;
+ if( conditions != null ? !conditions.equals( that.conditions ) : that.conditions != null )
+ return false;
+ if( countQuery != null ? !countQuery.equals( that.countQuery ) : that.countQuery != null )
+ return false;
+ if( inputFormatClass != null ? !inputFormatClass.equals( that.inputFormatClass ) : that.inputFormatClass != null )
+ return false;
+ if( !Arrays.equals( orderBy, that.orderBy ) )
+ return false;
+ if( outputFormatClass != null ? !outputFormatClass.equals( that.outputFormatClass ) : that.outputFormatClass != null )
+ return false;
+ if( selectQuery != null ? !selectQuery.equals( that.selectQuery ) : that.selectQuery != null )
+ return false;
+ if( !Arrays.equals( updateBy, that.updateBy ) )
+ return false;
+ if( updateByFields != null ? !updateByFields.equals( that.updateByFields ) : that.updateByFields != null )
+ return false;
+ if( updateIfTuple != null ? !updateIfTuple.equals( that.updateIfTuple ) : that.updateIfTuple != null )
+ return false;
+ if( updateValueFields != null ? !updateValueFields.equals( that.updateValueFields ) : that.updateValueFields != null )
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + ( inputFormatClass != null ? inputFormatClass.hashCode() : 0 );
+ result = 31 * result + ( outputFormatClass != null ? outputFormatClass.hashCode() : 0 );
+ result = 31 * result + ( columns != null ? Arrays.hashCode( columns ) : 0 );
+ result = 31 * result + ( orderBy != null ? Arrays.hashCode( orderBy ) : 0 );
+ result = 31 * result + ( conditions != null ? conditions.hashCode() : 0 );
+ result = 31 * result + ( updateBy != null ? Arrays.hashCode( updateBy ) : 0 );
+ result = 31 * result + ( updateValueFields != null ? updateValueFields.hashCode() : 0 );
+ result = 31 * result + ( updateByFields != null ? updateByFields.hashCode() : 0 );
+ result = 31 * result + ( columnFields != null ? columnFields.hashCode() : 0 );
+ result = 31 * result + ( updateIfTuple != null ? updateIfTuple.hashCode() : 0 );
+ result = 31 * result + ( selectQuery != null ? selectQuery.hashCode() : 0 );
+ result = 31 * result + ( countQuery != null ? countQuery.hashCode() : 0 );
+ result = 31 * result + (int) ( limit ^ ( limit >>> 32 ) );
+ return result;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java
new file mode 100644
index 0000000..1c10a05
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTap.java
@@ -0,0 +1,621 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc;
+
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.util.HadoopUtil;
+import cascading.tap.SinkMode;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
+import cascading.tuple.TupleEntryCollector;
+import cascading.tuple.TupleEntryIterator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import parallelai.spyglass.jdbc.db.DBConfiguration;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Class JDBCTap is a {@link Tap} sub-class that provides read and write access to a RDBMS via JDBC drivers.
+ * <p/>
+ * This Tap fully supports TABLE DROP and CREATE when given a {@link TableDesc} instance.
+ * <p/>
+ * When using {@link SinkMode#UPDATE}, Cascading is instructed to not delete the resource (drop the Table)
+ * and assumes its safe to begin sinking data into it. The {@link JDBCScheme} is responsible for
+ * deciding if/when to perform an UPDATE instead of an INSERT.
+ * <p/>
+ * Both INSERT and UPDATE are supported through the JDBCScheme.
+ * <p/>
+ * By sub-classing JDBCScheme, {@link com.twitter.maple.jdbc.db.DBInputFormat}, and {@link com.twitter.maple.jdbc.db.DBOutputFormat},
+ * specific vendor features can be supported.
+ * <p/>
+ * Use {@link #setBatchSize(int)} to set the number of INSERT/UPDATES should be grouped together before being
+ * executed. The default vaue is 1,000.
+ * <p/>
+ * Use {@link #executeQuery(String, int)} or {@link #executeUpdate(String)} to invoke SQL statements against
+ * the underlying Table.
+ * <p/>
+ * Note that all classes under the {@link com.twitter.maple.jdbc.db} package originated from the Hadoop project and
+ * retain their Apache 2.0 license though they have been heavily modified to support INSERT/UPDATE and
+ * vendor specialization, and a number of other features like 'limit'.
+ *
+ * @see JDBCScheme
+ * @see com.twitter.maple.jdbc.db.DBInputFormat
+ * @see com.twitter.maple.jdbc.db.DBOutputFormat
+ */
+public class JDBCTap extends Tap<JobConf, RecordReader, OutputCollector> {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCTap.class);
+
+ private final String id = UUID.randomUUID().toString();
+
+ /** Field connectionUrl */
+ String connectionUrl;
+ /** Field username */
+ String username;
+ /** Field password */
+ String password;
+ /** Field driverClassName */
+ String driverClassName;
+ /** Field tableDesc */
+ TableDesc tableDesc;
+ /** Field batchSize */
+ int batchSize = 1000;
+ /** Field concurrentReads */
+ int concurrentReads = 0;
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance.
+ * <p/>
+ * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated
+ * into. By default it uses {@link SinkMode#UPDATE}.
+ *
+ * @param connectionUrl of type String
+ * @param username of type String
+ * @param password of type String
+ * @param driverClassName of type String
+ * @param tableName of type String
+ * @param scheme of type JDBCScheme
+ */
+ public JDBCTap( String connectionUrl, String username, String password, String driverClassName, String tableName, JDBCScheme scheme ) {
+ this( connectionUrl, username, password, driverClassName, new TableDesc( tableName ), scheme, SinkMode.UPDATE );
+ }
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance.
+ *
+ * @param connectionUrl of type String
+ * @param driverClassName of type String
+ * @param tableDesc of type TableDesc
+ * @param scheme of type JDBCScheme
+ * @param sinkMode of type SinkMode
+ */
+ public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) {
+ this( connectionUrl, null, null, driverClassName, tableDesc, scheme, sinkMode );
+ }
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance.
+ * <p/>
+ * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated
+ * into. By default it uses {@link SinkMode#UPDATE}.
+ *
+ * @param connectionUrl of type String
+ * @param username of type String
+ * @param password of type String
+ * @param driverClassName of type String
+ * @param tableDesc of type TableDesc
+ * @param scheme of type JDBCScheme
+ */
+ public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) {
+ this( connectionUrl, username, password, driverClassName, tableDesc, scheme, SinkMode.UPDATE );
+ }
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance.
+ *
+ * @param connectionUrl of type String
+ * @param username of type String
+ * @param password of type String
+ * @param driverClassName of type String
+ * @param tableDesc of type TableDesc
+ * @param scheme of type JDBCScheme
+ * @param sinkMode of type SinkMode
+ */
+ public JDBCTap( String connectionUrl, String username, String password, String driverClassName, TableDesc tableDesc, JDBCScheme scheme, SinkMode sinkMode ) {
+ super( scheme, sinkMode );
+ this.connectionUrl = connectionUrl;
+ this.username = username;
+ this.password = password;
+ this.driverClassName = driverClassName;
+ this.tableDesc = tableDesc;
+
+ if( tableDesc.getColumnDefs() == null && sinkMode != SinkMode.UPDATE )
+ throw new IllegalArgumentException( "cannot have sink mode REPLACE or KEEP without TableDesc column defs, use UPDATE mode" );
+
+ if( sinkMode != SinkMode.UPDATE )
+ LOG.warn( "using sink mode: {}, consider UPDATE to prevent DROP TABLE from being called during Flow or Cascade setup", sinkMode );
+ }
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance.
+ * <p/>
+ * Use this constructor for connecting to existing tables that will be read from, or will be inserted/updated
+ * into. By default it uses {@link SinkMode#UPDATE}.
+ *
+ * @param connectionUrl of type String
+ * @param driverClassName of type String
+ * @param tableDesc of type TableDesc
+ * @param scheme of type JDBCScheme
+ */
+ public JDBCTap( String connectionUrl, String driverClassName, TableDesc tableDesc, JDBCScheme scheme ) {
+ this( connectionUrl, driverClassName, tableDesc, scheme, SinkMode.UPDATE );
+ }
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance that may only used as a data source.
+ *
+ * @param connectionUrl of type String
+ * @param username of type String
+ * @param password of type String
+ * @param driverClassName of type String
+ * @param scheme of type JDBCScheme
+ */
+ public JDBCTap( String connectionUrl, String username, String password, String driverClassName, JDBCScheme scheme ) {
+ super( scheme );
+ this.connectionUrl = connectionUrl;
+ this.username = username;
+ this.password = password;
+ this.driverClassName = driverClassName;
+ }
+
+ /**
+ * Constructor JDBCTap creates a new JDBCTap instance.
+ *
+ * @param connectionUrl of type String
+ * @param driverClassName of type String
+ * @param scheme of type JDBCScheme
+ */
+ public JDBCTap( String connectionUrl, String driverClassName, JDBCScheme scheme ) {
+ this( connectionUrl, null, null, driverClassName, scheme );
+ }
+
+ /**
+ * Method getTableName returns the tableName of this JDBCTap object.
+ *
+ * @return the tableName (type String) of this JDBCTap object.
+ */
+ public String getTableName() {
+ return tableDesc.tableName;
+ }
+
+ /**
+ * Method setBatchSize sets the batchSize of this JDBCTap object.
+ *
+ * @param batchSize the batchSize of this JDBCTap object.
+ */
+ public void setBatchSize( int batchSize ) {
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Method getBatchSize returns the batchSize of this JDBCTap object.
+ *
+ * @return the batchSize (type int) of this JDBCTap object.
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * Method getConcurrentReads returns the concurrentReads of this JDBCTap object.
+ * <p/>
+ * This value specifies the number of concurrent selects and thus the number of mappers
+ * that may be used. A value of -1 uses the job default.
+ *
+ * @return the concurrentReads (type int) of this JDBCTap object.
+ */
+ public int getConcurrentReads() {
+ return concurrentReads;
+ }
+
+ /**
+ * Method setConcurrentReads sets the concurrentReads of this JDBCTap object.
+ * <p/>
+ * This value specifies the number of concurrent selects and thus the number of mappers
+ * that may be used. A value of -1 uses the job default.
+ *
+ * @param concurrentReads the concurrentReads of this JDBCTap object.
+ */
+ public void setConcurrentReads( int concurrentReads ) {
+ this.concurrentReads = concurrentReads;
+ }
+
+ /**
+ * Method getPath returns the path of this JDBCTap object.
+ *
+ * @return the path (type Path) of this JDBCTap object.
+ */
+ public Path getPath() {
+ return new Path( getJDBCPath() );
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getJDBCPath() + this.id;
+ }
+
+
+ public String getJDBCPath() {
+ return "jdbc:/" + connectionUrl.replaceAll( ":", "_" );
+ }
+
+ public boolean isWriteDirect() {
+ return true;
+ }
+
+ private JobConf getSourceConf( FlowProcess<JobConf> flowProcess, JobConf conf, String property )
+ throws IOException {
+ // Map<String, String> priorConf = HadoopUtil.deserializeBase64( property, conf, true );
+ // return flowProcess.mergeMapIntoConfig( conf, priorConf );
+
+ return null;
+ }
+
+ @Override
+ public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException {
+ // input may be null when this method is called on the client side or cluster side when accumulating
+ // for a HashJoin
+ return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
+ }
+
+ @Override
+ public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException {
+ if( !isSink() )
+ throw new TapException( "this tap may not be used as a sink, no TableDesc defined" );
+
+ LOG.info("Creating JDBCTapCollector output instance");
+ JDBCTapCollector jdbcCollector = new JDBCTapCollector( flowProcess, this );
+
+ jdbcCollector.prepare();
+
+ return jdbcCollector;
+ }
+
+ @Override
+ public boolean isSink()
+ {
+ return tableDesc != null;
+ }
+
+ @Override
+ public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf )
+ {
+ // a hack for MultiInputFormat to see that there is a child format
+ FileInputFormat.setInputPaths( conf, getPath() );
+
+ if( username == null )
+ DBConfiguration.configureDB(conf, driverClassName, connectionUrl);
+ else
+ DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
+
+ super.sourceConfInit( process, conf );
+ }
+
+ @Override
+ public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf )
+ {
+ if( !isSink() )
+ return;
+
+ // do not delete if initialized from within a task
+ try {
+ if( isReplace() && conf.get( "mapred.task.partition" ) == null && !deleteResource( conf ) )
+ throw new TapException( "unable to drop table: " + tableDesc.getTableName() );
+
+ if( !createResource( conf ) )
+ throw new TapException( "unable to create table: " + tableDesc.getTableName() );
+ } catch(IOException e) {
+ throw new TapException( "error while trying to modify table: " + tableDesc.getTableName() );
+ }
+
+ if( username == null )
+ DBConfiguration.configureDB( conf, driverClassName, connectionUrl );
+ else
+ DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
+
+ super.sinkConfInit( process, conf );
+ }
+
+ private Connection createConnection()
+ {
+ try
+ {
+ LOG.info( "creating connection: {}", connectionUrl );
+
+ Class.forName( driverClassName );
+
+ Connection connection = null;
+
+ if( username == null )
+ connection = DriverManager.getConnection( connectionUrl );
+ else
+ connection = DriverManager.getConnection( connectionUrl, username, password );
+
+ connection.setAutoCommit( false );
+
+ return connection;
+ }
+ catch( ClassNotFoundException exception )
+ {
+ throw new TapException( "unable to load driver class: " + driverClassName, exception );
+ }
+ catch( SQLException exception )
+ {
+ throw new TapException( "unable to open connection: " + connectionUrl, exception );
+ }
+ }
+
+ /**
+ * Method executeUpdate allows for ad-hoc update statements to be sent to the remote RDBMS. The number of
+ * rows updated will be returned, if applicable.
+ *
+ * @param updateString of type String
+ * @return int
+ */
+ public int executeUpdate( String updateString )
+ {
+ Connection connection = null;
+ int result;
+
+ try
+ {
+ connection = createConnection();
+
+ try
+ {
+ LOG.info( "executing update: {}", updateString );
+
+ Statement statement = connection.createStatement();
+
+ result = statement.executeUpdate( updateString );
+
+ connection.commit();
+ statement.close();
+ }
+ catch( SQLException exception )
+ {
+ throw new TapException( "unable to execute update statement: " + updateString, exception );
+ }
+ }
+ finally
+ {
+ try
+ {
+ if( connection != null )
+ connection.close();
+ }
+ catch( SQLException exception )
+ {
+ // ignore
+ LOG.warn( "ignoring connection close exception", exception );
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Method executeQuery allows for ad-hoc queries to be sent to the remove RDBMS. A value
+ * of -1 for returnResults will return a List of all results from the query, a value of 0 will return an empty List.
+ *
+ * @param queryString of type String
+ * @param returnResults of type int
+ * @return List
+ */
+ public List<Object[]> executeQuery( String queryString, int returnResults )
+ {
+ Connection connection = null;
+ List<Object[]> result = Collections.emptyList();
+
+ try
+ {
+ connection = createConnection();
+
+ try
+ {
+ LOG.info( "executing query: {}", queryString );
+
+ Statement statement = connection.createStatement();
+
+ ResultSet resultSet = statement.executeQuery( queryString ); // we don't care about results
+
+ if( returnResults != 0 )
+ result = copyResultSet( resultSet, returnResults == -1 ? Integer.MAX_VALUE : returnResults );
+
+ connection.commit();
+ statement.close();
+ }
+ catch( SQLException exception )
+ {
+ throw new TapException( "unable to execute query statement: " + queryString, exception );
+ }
+ }
+ finally
+ {
+ try
+ {
+ if( connection != null )
+ connection.close();
+ }
+ catch( SQLException exception )
+ {
+ // ignore
+ LOG.warn( "ignoring connection close exception", exception );
+ }
+ }
+
+ return result;
+ }
+
+ private List<Object[]> copyResultSet( ResultSet resultSet, int length ) throws SQLException
+ {
+ List<Object[]> results = new ArrayList<Object[]>( length );
+ int size = resultSet.getMetaData().getColumnCount();
+
+ int count = 0;
+
+ while( resultSet.next() && count < length )
+ {
+ count++;
+
+ Object[] row = new Object[size];
+
+ for( int i = 0; i < row.length; i++ )
+ row[ i ] = resultSet.getObject( i + 1 );
+
+ results.add( row );
+ }
+
+ return results;
+ }
+
+ @Override
+ public boolean createResource( JobConf conf ) throws IOException
+ {
+ if( resourceExists( conf ) )
+ return true;
+
+ try
+ {
+ LOG.info( "creating table: {}", tableDesc.tableName );
+
+ executeUpdate( tableDesc.getCreateTableStatement() );
+ }
+ catch( TapException exception )
+ {
+ LOG.warn( "unable to create table: {}", tableDesc.tableName );
+ LOG.warn( "sql failure", exception.getCause() );
+
+ return false;
+ }
+
+ return resourceExists( conf );
+ }
+
+ @Override
+ public boolean deleteResource( JobConf conf ) throws IOException
+ {
+ if( !isSink() )
+ return false;
+
+ if( !resourceExists( conf ) )
+ return true;
+
+ try
+ {
+ LOG.info( "deleting table: {}", tableDesc.tableName );
+
+ executeUpdate( tableDesc.getTableDropStatement() );
+ }
+ catch( TapException exception )
+ {
+ LOG.warn( "unable to drop table: {}", tableDesc.tableName );
+ LOG.warn( "sql failure", exception.getCause() );
+
+ return false;
+ }
+
+ return !resourceExists( conf );
+ }
+
+ @Override
+ public boolean resourceExists( JobConf conf ) throws IOException
+ {
+ if( !isSink() )
+ return true;
+
+ try
+ {
+ LOG.info( "test table exists: {}", tableDesc.tableName );
+
+ executeQuery( tableDesc.getTableExistsQuery(), 0 );
+ }
+ catch( TapException exception )
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public long getModifiedTime( JobConf conf ) throws IOException
+ {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JDBCTap{" + "connectionUrl='" + connectionUrl + '\'' + ", driverClassName='" + driverClassName + '\'' + ", tableDesc=" + tableDesc + '}';
+ }
+
+ @Override
+ public boolean equals( Object object )
+ {
+ if( this == object )
+ return true;
+ if( !( object instanceof JDBCTap ) )
+ return false;
+ if( !super.equals( object ) )
+ return false;
+
+ JDBCTap jdbcTap = (JDBCTap) object;
+
+ if( connectionUrl != null ? !connectionUrl.equals( jdbcTap.connectionUrl ) : jdbcTap.connectionUrl != null )
+ return false;
+ if( driverClassName != null ? !driverClassName.equals( jdbcTap.driverClassName ) : jdbcTap.driverClassName != null )
+ return false;
+ if( password != null ? !password.equals( jdbcTap.password ) : jdbcTap.password != null )
+ return false;
+ if( tableDesc != null ? !tableDesc.equals( jdbcTap.tableDesc ) : jdbcTap.tableDesc != null )
+ return false;
+ if( username != null ? !username.equals( jdbcTap.username ) : jdbcTap.username != null )
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = super.hashCode();
+ result = 31 * result + ( connectionUrl != null ? connectionUrl.hashCode() : 0 );
+ result = 31 * result + ( username != null ? username.hashCode() : 0 );
+ result = 31 * result + ( password != null ? password.hashCode() : 0 );
+ result = 31 * result + ( driverClassName != null ? driverClassName.hashCode() : 0 );
+ result = 31 * result + ( tableDesc != null ? tableDesc.hashCode() : 0 );
+ result = 31 * result + batchSize;
+ return result;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java
new file mode 100644
index 0000000..e8b7deb
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc;
+
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowProcess;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tuple.TupleEntrySchemeCollector;
+import org.apache.hadoop.mapred.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Class JDBCTapCollector is a kind of {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the resource managed by
+ * a particular {@link JDBCTap} instance.
+ */
+public class JDBCTapCollector extends TupleEntrySchemeCollector implements OutputCollector
+{
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger( JDBCTapCollector.class );
+
+ /** Field conf */
+ private final JobConf conf;
+ /** Field writer */
+ private RecordWriter writer;
+ /** Field flowProcess */
+ private final FlowProcess<JobConf> hadoopFlowProcess;
+ /** Field tap */
+ private final Tap<JobConf, RecordReader, OutputCollector> tap;
+ /** Field reporter */
+ private final Reporter reporter = Reporter.NULL;
+
+ /**
+ * Constructor TapCollector creates a new TapCollector instance.
+ *
+ * @param flowProcess
+ * @param tap of type Tap
+ * @throws IOException when fails to initialize
+ */
+ public JDBCTapCollector( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap ) throws IOException {
+ super( flowProcess, tap.getScheme() );
+ this.hadoopFlowProcess = flowProcess;
+
+ this.tap = tap;
+ this.conf = new JobConf( flowProcess.getConfigCopy() );
+
+ this.setOutput( this );
+ }
+
+ @Override
+ public void prepare() {
+ try {
+ initialize();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ super.prepare();
+ }
+
+ private void initialize() throws IOException {
+ tap.sinkConfInit( hadoopFlowProcess, conf );
+
+ OutputFormat outputFormat = conf.getOutputFormat();
+
+ LOG.info("Output format class is: " + outputFormat.getClass().toString());
+
+ writer = outputFormat.getRecordWriter( null, conf, tap.getIdentifier(), Reporter.NULL );
+
+ sinkCall.setOutput( this );
+ }
+
+ @Override
+ public void close() {
+ try {
+ LOG.info( "closing tap collector for: {}", tap );
+ writer.close( reporter );
+ } catch( IOException exception ) {
+ LOG.warn( "exception closing: {}", exception );
+ throw new TapException( "exception closing JDBCTapCollector", exception );
+ } finally {
+ super.close();
+ }
+ }
+
+ /**
+ * Method collect writes the given values to the {@link Tap} this instance encapsulates.
+ *
+ * @param writableComparable of type WritableComparable
+ * @param writable of type Writable
+ * @throws IOException when
+ */
+ public void collect( Object writableComparable, Object writable ) throws IOException {
+ if (hadoopFlowProcess instanceof HadoopFlowProcess)
+ ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress();
+
+ writer.write( writableComparable, writable );
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/TableDesc.java b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java
new file mode 100644
index 0000000..b0aaea7
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/TableDesc.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc;
+
+import cascading.util.Util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Class TableDesc describes a SQL based table, this description is used by the {@link JDBCTap} when
+ * creating a missing table.
+ *
+ * @see JDBCTap
+ * @see JDBCScheme
+ */
+public class TableDesc implements Serializable {
+ /** Field tableName */
+ String tableName;
+ /** Field columnNames */
+ String[] columnNames;
+ /** Field columnDefs */
+ String[] columnDefs;
+ /** Field primaryKeys */
+ String[] primaryKeys;
+
+ /**
+ * Constructor TableDesc creates a new TableDesc instance.
+ *
+ * @param tableName of type String
+ */
+ public TableDesc( String tableName ) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * Constructor TableDesc creates a new TableDesc instance.
+ *
+ * @param tableName of type String
+ * @param columnNames of type String[]
+ * @param columnDefs of type String[]
+ * @param primaryKeys of type String
+ */
+ public TableDesc( String tableName, String[] columnNames, String[] columnDefs, String[] primaryKeys ) {
+ this.tableName = tableName;
+ this.columnNames = columnNames;
+ this.columnDefs = columnDefs;
+ this.primaryKeys = primaryKeys;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String[] getColumnNames() {
+ return columnNames;
+ }
+
+ public String[] getColumnDefs() {
+ return columnDefs;
+ }
+
+ public String[] getPrimaryKeys() {
+ return primaryKeys;
+ }
+
+ /**
+ * Method getTableCreateStatement returns the tableCreateStatement of this TableDesc object.
+ *
+ * @return the tableCreateStatement (type String) of this TableDesc object.
+ */
+ public String getCreateTableStatement() {
+ List<String> createTableStatement = new ArrayList<String>();
+
+ createTableStatement = addCreateTableBodyTo( createTableStatement );
+
+ return String.format( getCreateTableFormat(), tableName, Util.join( createTableStatement, ", " ) );
+ }
+
+ protected List<String> addCreateTableBodyTo( List<String> createTableStatement ) {
+ createTableStatement = addDefinitionsTo( createTableStatement );
+ createTableStatement = addPrimaryKeyTo( createTableStatement );
+
+ return createTableStatement;
+ }
+
+ protected String getCreateTableFormat() {
+ return "CREATE TABLE %s ( %s )";
+ }
+
+ protected List<String> addDefinitionsTo( List<String> createTableStatement ) {
+ for( int i = 0; i < columnNames.length; i++ ) {
+ String columnName = columnNames[ i ];
+ String columnDef = columnDefs[ i ];
+
+ createTableStatement.add( columnName + " " + columnDef );
+ }
+
+ return createTableStatement;
+ }
+
+ protected List<String> addPrimaryKeyTo( List<String> createTableStatement ) {
+ if( hasPrimaryKey() )
+ createTableStatement.add( String.format( "PRIMARY KEY( %s )", Util.join( primaryKeys, ", " ) ) );
+
+ return createTableStatement;
+ }
+
+ /**
+ * Method getTableDropStatement returns the tableDropStatement of this TableDesc object.
+ *
+ * @return the tableDropStatement (type String) of this TableDesc object.
+ */
+ public String getTableDropStatement() {
+ return String.format( getDropTableFormat(), tableName );
+ }
+
+ protected String getDropTableFormat() {
+ return "DROP TABLE %s";
+ }
+
+ /**
+ * Method getTableExistsQuery returns the tableExistsQuery of this TableDesc object.
+ *
+ * @return the tableExistsQuery (type String) of this TableDesc object.
+ */
+ public String getTableExistsQuery() {
+ return String.format( "select 1 from %s where 1 = 0", tableName );
+ }
+
+ private boolean hasPrimaryKey() {
+ return primaryKeys != null && primaryKeys.length != 0;
+ }
+
+ @Override
+ public String toString() {
+ return "TableDesc{" + "tableName='" + tableName + '\'' + ", columnNames=" + ( columnNames == null ? null : Arrays.asList( columnNames ) ) + ", columnDefs=" + ( columnDefs == null ? null : Arrays.asList( columnDefs ) ) + ", primaryKeys=" + ( primaryKeys == null ? null : Arrays.asList( primaryKeys ) ) + '}';
+ }
+
+ @Override
+ public boolean equals( Object object ) {
+ if( this == object )
+ return true;
+ if( !( object instanceof TableDesc ) )
+ return false;
+
+ TableDesc tableDesc = (TableDesc) object;
+
+ if( !Arrays.equals( columnDefs, tableDesc.columnDefs ) )
+ return false;
+ if( !Arrays.equals( columnNames, tableDesc.columnNames ) )
+ return false;
+ if( !Arrays.equals( primaryKeys, tableDesc.primaryKeys ) )
+ return false;
+ if( tableName != null ? !tableName.equals( tableDesc.tableName ) : tableDesc.tableName != null )
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableName != null ? tableName.hashCode() : 0;
+ result = 31 * result + ( columnNames != null ? Arrays.hashCode( columnNames ) : 0 );
+ result = 31 * result + ( columnDefs != null ? Arrays.hashCode( columnDefs ) : 0 );
+ result = 31 * result + ( primaryKeys != null ? Arrays.hashCode( primaryKeys ) : 0 );
+ return result;
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java
new file mode 100644
index 0000000..4191e79
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/TupleRecord.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc;
+
+import cascading.tuple.Tuple;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import parallelai.spyglass.jdbc.db.DBWritable;
+
+public class TupleRecord implements DBWritable {
+ private Tuple tuple;
+
+ public TupleRecord() {
+ }
+
+ public TupleRecord( Tuple tuple ) {
+ this.tuple = tuple;
+ }
+
+ public void setTuple( Tuple tuple ) {
+ this.tuple = tuple;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+
+ public void write( PreparedStatement statement ) throws SQLException {
+ for( int i = 0; i < tuple.size(); i++ ) {
+ //System.out.println("Insert Tuple => " + " statement.setObject( " + (i + 1) + "," + tuple.get( i ));
+ statement.setObject( i + 1, tuple.get( i ) );
+ }
+ boolean test = true;
+ if (test) {
+ for( int i = 1; i < tuple.size(); i++ ) {
+ //System.out.println("Update Tuple => " + " statement.setObject( " + (i + tuple.size()) + "," + tuple.get( i ));
+ statement.setObject( i + tuple.size(), tuple.get( i ) );
+ }
+ }
+
+ }
+
+ public void readFields( ResultSet resultSet ) throws SQLException {
+ tuple = new Tuple();
+
+ for( int i = 0; i < resultSet.getMetaData().getColumnCount(); i++ )
+ tuple.add( (Comparable) resultSet.getObject( i + 1 ) );
+ }
+
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java
new file mode 100644
index 0000000..1bb0786
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * A container for configuration property names for jobs with DB input/output. <br> The job can be
+ * configured using the static methods in this class, {@link DBInputFormat}, and {@link
+ * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper
+ * values.
+ */
+public class DBConfiguration {
+
+ /** The JDBC Driver class name */
+ public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+
+ /** JDBC Database access URL */
+ public static final String URL_PROPERTY = "mapred.jdbc.url";
+
+ /** User name to access the database */
+ public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+
+ /** Password to access the database */
+ public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+
+ /** Input table name */
+ public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+
+ /** Field names in the Input table */
+ public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+
+ /** WHERE clause in the input SELECT statement */
+ public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+
+ /** ORDER BY clause in the input SELECT statement */
+ public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+
+ /** Whole input query, exluding LIMIT...OFFSET */
+ public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+
+ /** The number of records to LIMIT, useful for testing */
+ public static final String INPUT_LIMIT = "mapred.jdbc.input.limit";
+
+ /** Input query to get the count of records */
+ public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+
+ /** Class name implementing DBWritable which will hold input tuples */
+ public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+
+ /** Output table name */
+ public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+
+ /** Field names in the Output table */
+ public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";
+
+ /** Field names in the Output table */
+ public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY =
+ "mapred.jdbc.output.update.field.names";
+
+ /** The number of statements to batch before executing */
+ public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num";
+
+ /** The number of splits allowed, becomes max concurrent reads. */
+ public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num";
+
+ /**
+ * Sets the DB access related fields in the Configuration.
+ *
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ * @param userName DB access username
+ * @param passwd DB access passwd
+ */
+ public static void configureDB(Configuration job, String driverClass, String dbUrl,
+ String userName, String passwd) {
+ job.set(DRIVER_CLASS_PROPERTY, driverClass);
+ job.set(URL_PROPERTY, dbUrl);
+
+ if (userName != null) { job.set(USERNAME_PROPERTY, userName); }
+
+ if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); }
+ }
+
+ /**
+ * Sets the DB access related fields in the Configuration.
+ *
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ */
+ public static void configureDB(Configuration job, String driverClass, String dbUrl) {
+ configureDB(job, driverClass, dbUrl, null, null);
+ }
+
+ private Configuration job;
+
+ DBConfiguration(Configuration job) {
+ this.job = job;
+ }
+
+ /**
+ * Returns a connection object to the DB
+ *
+ * @throws ClassNotFoundException
+ * @throws SQLException
+ */
+ Connection getConnection() throws IOException {
+ try {
+ Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
+ } catch (ClassNotFoundException exception) {
+ throw new IOException("unable to load conection driver", exception);
+ }
+
+ try {
+ if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
+ return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
+ } else {
+ return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job
+ .get(DBConfiguration.USERNAME_PROPERTY), job
+ .get(DBConfiguration.PASSWORD_PROPERTY));
+ }
+ } catch (SQLException exception) {
+ throw new IOException("unable to create connection", exception);
+ }
+ }
+
+ String getInputTableName() {
+ return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
+ }
+
+ void setInputTableName(String tableName) {
+ job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ }
+
+ String[] getInputFieldNames() {
+ return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
+ }
+
+ void setInputFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ String getInputConditions() {
+ return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
+ }
+
+ void setInputConditions(String conditions) {
+ if (conditions != null && conditions.length() > 0) {
+ job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+ }
+ }
+
+ String getInputOrderBy() {
+ return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
+ }
+
+ void setInputOrderBy(String orderby) {
+ if (orderby != null && orderby.length() > 0) {
+ job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
+ }
+ }
+
+ String getInputQuery() {
+ return job.get(DBConfiguration.INPUT_QUERY);
+ }
+
+ void setInputQuery(String query) {
+ if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); }
+ }
+
+ long getInputLimit() {
+ return job.getLong(DBConfiguration.INPUT_LIMIT, -1);
+ }
+
+ void setInputLimit(long limit) {
+ job.setLong(DBConfiguration.INPUT_LIMIT, limit);
+ }
+
+ String getInputCountQuery() {
+ return job.get(DBConfiguration.INPUT_COUNT_QUERY);
+ }
+
+ void setInputCountQuery(String query) {
+ if (query != null && query.length() > 0) {
+ job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
+ }
+ }
+
+ Class<?> getInputClass() {
+ return job
+ .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class);
+ }
+
+ void setInputClass(Class<? extends DBWritable> inputClass) {
+ job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
+ }
+
+ String getOutputTableName() {
+ return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
+ }
+
+ void setOutputTableName(String tableName) {
+ job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+ }
+
+ String[] getOutputFieldNames() {
+ return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
+ }
+
+ void setOutputFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ String[] getOutputUpdateFieldNames() {
+ return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY);
+ }
+
+ void setOutputUpdateFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ int getBatchStatementsNum() {
+ return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000);
+ }
+
+ void setBatchStatementsNum(int batchStatementsNum) {
+ job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum);
+ }
+
+ int getMaxConcurrentReadsNum() {
+ return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0);
+ }
+
+ void setMaxConcurrentReadsNum(int maxConcurrentReads) {
+ if (maxConcurrentReads < 0) {
+ throw new IllegalArgumentException("maxConcurrentReads must be a positive value");
+ }
+
+ job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads);
+ }
+
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java
new file mode 100644
index 0000000..115d9bd
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.*;
+
+/**
+ * A InputFormat that reads input data from an SQL table. <p/> DBInputFormat emits LongWritables
+ * containing the record number as key and DBWritables as value. <p/> The SQL query, and input class
+ * can be using one of the two setInput methods.
+ */
+public class DBInputFormat<T extends DBWritable>
+ implements InputFormat<LongWritable, T>, JobConfigurable {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(DBInputFormat.class);
+
+ /**
+ * A RecordReader that reads records from a SQL table. Emits LongWritables containing the record
+ * number as key and DBWritables as value.
+ */
+ protected class DBRecordReader implements RecordReader<LongWritable, T> {
+ private ResultSet results;
+ private Statement statement;
+ private Class<T> inputClass;
+ private JobConf job;
+ private DBInputSplit split;
+ private long pos = 0;
+
+ /**
+ * @param split The InputSplit to read data for
+ * @throws SQLException
+ */
+ protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job)
+ throws SQLException, IOException {
+ this.inputClass = inputClass;
+ this.split = split;
+ this.job = job;
+
+ statement =
+ connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+ //statement.setFetchSize(Integer.MIN_VALUE);
+ String query = getSelectQuery();
+ try {
+ LOG.info(query);
+ results = statement.executeQuery(query);
+ LOG.info("done executing select query");
+ } catch (SQLException exception) {
+ LOG.error("unable to execute select query: " + query, exception);
+ throw new IOException("unable to execute select query: " + query, exception);
+ }
+ }
+
+ /**
+ * Returns the query for selecting the records, subclasses can override this for custom
+ * behaviour.
+ */
+ protected String getSelectQuery() {
+ LOG.info("Executing select query");
+ StringBuilder query = new StringBuilder();
+
+ if (dbConf.getInputQuery() == null) {
+ query.append("SELECT ");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+
+ if (i != fieldNames.length - 1)
+ query.append(", ");
+ }
+
+ query.append(" FROM ").append(tableName);
+ query.append(" AS ").append(tableName); //in hsqldb this is necessary
+
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE (").append(conditions).append(")");
+
+ String orderBy = dbConf.getInputOrderBy();
+
+ if (orderBy != null && orderBy.length() > 0)
+ query.append(" ORDER BY ").append(orderBy);
+
+ }
+ else
+ query.append(dbConf.getInputQuery());
+
+ try {
+ // Only add limit and offset if you have multiple chunks
+ if(split.getChunks() > 1) {
+ query.append(" LIMIT ").append(split.getLength());
+ query.append(" OFFSET ").append(split.getStart());
+ }
+ } catch (IOException ex) {
+ //ignore, will not throw
+ }
+
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ try {
+ connection.commit();
+ results.close();
+ statement.close();
+ } catch (SQLException exception) {
+ throw new IOException("unable to commit and close", exception);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ /** {@inheritDoc} */
+ public T createValue() {
+ return ReflectionUtils.newInstance(inputClass, job);
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() throws IOException {
+ return pos / (float) split.getLength();
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(LongWritable key, T value) throws IOException {
+ try {
+ if (!results.next())
+ return false;
+
+ // Set the key field value as the output key value
+ key.set(pos + split.getStart());
+
+ value.readFields(results);
+
+ pos++;
+ } catch (SQLException exception) {
+ throw new IOException("unable to get next value", exception);
+ }
+
+ return true;
+ }
+ }
+
+ /** A Class that does nothing, implementing DBWritable */
+ public static class NullDBWritable implements DBWritable, Writable {
+
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ public void readFields(ResultSet arg0) throws SQLException {
+ }
+
+ public void write(DataOutput out) throws IOException {
+ }
+
+ public void write(PreparedStatement arg0) throws SQLException {
+ }
+ }
+
+ /** A InputSplit that spans a set of rows */
+ protected static class DBInputSplit implements InputSplit {
+ private long end = 0;
+ private long start = 0;
+ private long chunks = 0;
+
+ /** Default Constructor */
+ public DBInputSplit() {
+ }
+
+ /**
+ * Convenience Constructor
+ *
+ * @param start the index of the first row to select
+ * @param end the index of the last row to select
+ */
+ public DBInputSplit(long start, long end, long chunks) {
+ this.start = start;
+ this.end = end;
+ this.chunks = chunks;
+ LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks);
+ }
+
+ /** {@inheritDoc} */
+ public String[] getLocations() throws IOException {
+ // TODO Add a layer to enable SQL "sharding" and support locality
+ return new String[]{};
+ }
+
+ /** @return The index of the first row to select */
+ public long getStart() {
+ return start;
+ }
+
+ /** @return The index of the last row to select */
+ public long getEnd() {
+ return end;
+ }
+
+ /** @return The total row count in this split */
+ public long getLength() throws IOException {
+ return end - start;
+ }
+
+ /** @return The total number of chucks accross all splits */
+ public long getChunks() {
+ return chunks;
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput input) throws IOException {
+ start = input.readLong();
+ end = input.readLong();
+ chunks = input.readLong();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(start);
+ output.writeLong(end);
+ output.writeLong(chunks);
+ }
+ }
+
+ protected DBConfiguration dbConf;
+ protected Connection connection;
+
+ protected String tableName;
+ protected String[] fieldNames;
+ protected String conditions;
+ protected long limit;
+ protected int maxConcurrentReads;
+
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+ dbConf = new DBConfiguration(job);
+
+ tableName = dbConf.getInputTableName();
+ fieldNames = dbConf.getInputFieldNames();
+ conditions = dbConf.getInputConditions();
+ limit = dbConf.getInputLimit();
+ maxConcurrentReads = dbConf.getMaxConcurrentReadsNum();
+
+ try {
+ connection = dbConf.getConnection();
+ } catch (IOException exception) {
+ throw new RuntimeException("unable to create connection", exception.getCause());
+ }
+
+ configureConnection(connection);
+ }
+
+ protected void configureConnection(Connection connection) {
+ setTransactionIsolationLevel(connection);
+ setAutoCommit(connection);
+ }
+
+ protected void setAutoCommit(Connection connection) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (Exception exception) {
+ throw new RuntimeException("unable to set auto commit", exception);
+ }
+ }
+
+ protected void setTransactionIsolationLevel(Connection connection) {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ } catch (SQLException exception) {
+ throw new RuntimeException("unable to configure transaction isolation level", exception);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ Class inputClass = dbConf.getInputClass();
+ try {
+ return new DBRecordReader((DBInputSplit) split, inputClass, job);
+ } catch (SQLException exception) {
+ throw new IOException(exception.getMessage(), exception);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
+ // use the configured value if avail
+ chunks = maxConcurrentReads == 0 ? chunks : maxConcurrentReads;
+
+ try {
+ Statement statement = connection.createStatement();
+
+ ResultSet results = statement.executeQuery(getCountQuery());
+
+ long count = 0;
+
+ while (results.next())
+ count += results.getLong(1);
+
+ if (limit != -1)
+ count = Math.min(limit, count);
+
+ long chunkSize = (count / chunks);
+
+ results.close();
+ statement.close();
+
+ InputSplit[] splits = new InputSplit[chunks];
+
+ // Split the rows into n-number of chunks and adjust the last chunk
+ // accordingly
+ for (int i = 0; i < chunks; i++) {
+ DBInputSplit split;
+
+ if (i + 1 == chunks)
+ split = new DBInputSplit(i * chunkSize, count, chunks);
+ else
+ split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks);
+
+ splits[i] = split;
+ }
+
+ return splits;
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Returns the query for getting the total number of rows, subclasses can override this for
+ * custom behaviour.
+ */
+ protected String getCountQuery() {
+
+ if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); }
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT COUNT(*) FROM " + tableName);
+
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE " + conditions);
+
+ return query.toString();
+ }
+
+ /**
+ * Initializes the map-part of the job with the appropriate input settings.
+ *
+ * @param job The job
+ * @param inputClass the class object implementing DBWritable, which is the Java object
+ * holding tuple fields.
+ * @param tableName The table to read data from
+ * @param conditions The condition which to select data with, eg. '(updated > 20070101 AND
+ * length > 0)'
+ * @param orderBy the fieldNames in the orderBy clause.
+ * @param limit
+ * @param fieldNames The field names in the table
+ * @param concurrentReads
+ */
+ public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+ String tableName, String conditions, String orderBy, long limit, int concurrentReads,
+ String... fieldNames) {
+ job.setInputFormat(DBInputFormat.class);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setInputClass(inputClass);
+ dbConf.setInputTableName(tableName);
+ dbConf.setInputFieldNames(fieldNames);
+ dbConf.setInputConditions(conditions);
+ dbConf.setInputOrderBy(orderBy);
+
+ if (limit != -1)
+ dbConf.setInputLimit(limit);
+
+ dbConf.setMaxConcurrentReadsNum(concurrentReads);
+ }
+
+ /**
+ * Initializes the map-part of the job with the appropriate input settings.
+ *
+ * @param job The job
+ * @param inputClass the class object implementing DBWritable, which is the Java object
+ * holding tuple fields.
+ * @param selectQuery the input query to select fields. Example : "SELECT f1, f2, f3 FROM
+ * Mytable ORDER BY f1"
+ * @param countQuery the input query that returns the number of records in the table.
+ * Example : "SELECT COUNT(f1) FROM Mytable"
+ * @param concurrentReads
+ */
+ public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+ String selectQuery, String countQuery, long limit, int concurrentReads) {
+ job.setInputFormat(DBInputFormat.class);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setInputClass(inputClass);
+ dbConf.setInputQuery(selectQuery);
+ dbConf.setInputCountQuery(countQuery);
+
+ if (limit != -1)
+ dbConf.setInputLimit(limit);
+
+ dbConf.setMaxConcurrentReadsNum(concurrentReads);
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
new file mode 100644
index 0000000..1166970
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
@@ -0,0 +1,391 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+import com.jcraft.jsch.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts
+ * &lt;key,value&gt; pairs, where key has a type extending DBWritable. Returned {@link RecordWriter}
+ * writes <b>only the key</b> to the database with a batch SQL query.
+ */
+public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> {
+ private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+
+ /** A RecordWriter that writes the reduce output to a SQL table */
+ protected class DBRecordWriter implements RecordWriter<K, V> {
+ private Connection connection;
+ private PreparedStatement insertStatement;
+ private PreparedStatement updateStatement;
+ private final int statementsBeforeExecute;
+
+ private long statementsAdded = 0;
+ private long insertStatementsCurrent = 0;
+ private long updateStatementsCurrent = 0;
+
+ protected DBRecordWriter(Connection connection, PreparedStatement insertStatement,
+ PreparedStatement updateStatement, int statementsBeforeExecute) {
+ this.connection = connection;
+ this.insertStatement = insertStatement;
+ this.updateStatement = updateStatement;
+ this.statementsBeforeExecute = statementsBeforeExecute;
+ }
+
+ /** {@inheritDoc} */
+ public void close(Reporter reporter) throws IOException {
+ executeBatch();
+
+ try {
+ if (insertStatement != null) { insertStatement.close(); }
+
+ if (updateStatement != null) { updateStatement.close(); }
+
+ connection.commit();
+ } catch (SQLException exception) {
+ rollBack();
+
+ createThrowMessage("unable to commit batch", 0, exception);
+ } finally {
+ try {
+ connection.close();
+ } catch (SQLException exception) {
+ throw new IOException("unable to close connection", exception);
+ }
+ }
+ }
+
+ private void executeBatch() throws IOException {
+ try {
+ if (insertStatementsCurrent != 0) {
+ LOG.info(
+ "executing insert batch " + createBatchMessage(insertStatementsCurrent));
+
+ insertStatement.executeBatch();
+ }
+
+ insertStatementsCurrent = 0;
+ } catch (SQLException exception) {
+ rollBack();
+
+ createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception);
+ }
+
+ try {
+ if (updateStatementsCurrent != 0) {
+ LOG.info(
+ "executing update batch " + createBatchMessage(updateStatementsCurrent));
+
+ int[] result = updateStatement.executeBatch();
+
+ int count = 0;
+
+ for (int value : result) { count += value; }
+
+ if (count != updateStatementsCurrent) {
+ throw new IOException(
+ "update did not update same number of statements executed in batch, batch: "
+ + updateStatementsCurrent + " updated: " + count);
+ }
+ }
+
+ updateStatementsCurrent = 0;
+ } catch (SQLException exception) {
+
+ String message = exception.getMessage();
+ if (message.indexOf("Duplicate Key") >= 0) {
+ LOG.warn("In exception block. Bypass exception becuase of Insert/Update.");
+ } else {
+ rollBack();
+
+ createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception);
+ }
+ }
+ }
+
+ private void rollBack() {
+ try {
+ connection.rollback();
+ } catch (SQLException sqlException) {
+ LOG.warn(StringUtils.stringifyException(sqlException));
+ }
+ }
+
+ private String createBatchMessage(long currentStatements) {
+ return String
+ .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute);
+ }
+
+ private void createThrowMessage(String stateMessage, long currentStatements,
+ SQLException exception) throws IOException {
+ String message = exception.getMessage();
+
+ message = message.substring(0, Math.min(75, message.length()));
+
+ int messageLength = exception.getMessage().length();
+ String batchMessage = createBatchMessage(currentStatements);
+ String template = "%s [msglength: %d]%s %s";
+ String errorMessage =
+ String.format(template, stateMessage, messageLength, batchMessage, message);
+
+ LOG.error(errorMessage, exception.getNextException());
+
+ throw new IOException(errorMessage, exception.getNextException());
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void write(K key, V value) throws IOException {
+ try {
+ if (value == null) {
+ key.write(insertStatement);
+ insertStatement.addBatch();
+ insertStatementsCurrent++;
+ } else {
+ key.write(updateStatement);
+ updateStatement.addBatch();
+ updateStatementsCurrent++;
+ }
+ } catch (SQLException exception) {
+ throw new IOException("unable to add batch statement", exception);
+ }
+
+ statementsAdded++;
+
+ if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); }
+ }
+ }
+
+ /**
+ * Constructs the query used as the prepared statement to insert data.
+ *
+ * @param table the table to insert into
+ * @param fieldNames the fields to insert into. If field names are unknown, supply an array of
+ * nulls.
+ */
+ protected String constructInsertQuery(String table, String[] fieldNames) {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("Field names may not be null");
+ }
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("INSERT INTO ").append(table);
+
+ if (fieldNames.length > 0 && fieldNames[0] != null) {
+ query.append(" (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+
+ if (i != fieldNames.length - 1) { query.append(","); }
+ }
+
+ query.append(")");
+
+ }
+
+ query.append(" VALUES (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append("?");
+
+ if (i != fieldNames.length - 1) { query.append(","); }
+ }
+
+ query.append(")");
+
+ boolean test = true;
+ if (test) {
+ query.append(" ON DUPLICATE KEY UPDATE ");
+
+
+ for (int i = 1; i < fieldNames.length; i++) {
+
+
+ if ( (i != 1) ) { query.append(","); }
+ //if (i != fieldNames.length - 1) { query.append(","); }
+ //&& (i != fieldNames.length - 1)
+ query.append(fieldNames[i]);
+ query.append(" = ?");
+
+
+ }
+ }
+
+ query.append(";");
+
+ LOG.info(" ===================== " + query.toString());
+ return query.toString();
+ }
+
+ protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("field names may not be null");
+ }
+
+ Set<String> updateNamesSet = new HashSet<String>();
+ Collections.addAll(updateNamesSet, updateNames);
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("UPDATE ").append(table);
+
+ query.append(" SET ");
+
+ if (fieldNames.length > 0 && fieldNames[0] != null) {
+ int count = 0;
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (updateNamesSet.contains(fieldNames[i])) { continue; }
+
+ if (count != 0) { query.append(","); }
+
+ query.append(fieldNames[i]);
+ query.append(" = ?");
+
+ count++;
+ }
+ }
+
+ query.append(" WHERE ");
+
+ if (updateNames.length > 0 && updateNames[0] != null) {
+ for (int i = 0; i < updateNames.length; i++) {
+ query.append(updateNames[i]);
+ query.append(" = ?");
+
+ if (i != updateNames.length - 1) { query.append(" and "); }
+ }
+ }
+
+ query.append(";");
+ System.out.println("Update Query => " + query.toString());
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException {
+ }
+
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name,
+ Progressable progress) throws IOException {
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ String tableName = dbConf.getOutputTableName();
+ String[] fieldNames = dbConf.getOutputFieldNames();
+ String[] updateNames = dbConf.getOutputUpdateFieldNames();
+ int batchStatements = dbConf.getBatchStatementsNum();
+
+ Connection connection = dbConf.getConnection();
+
+ configureConnection(connection);
+
+ String sqlInsert = constructInsertQuery(tableName, fieldNames);
+ PreparedStatement insertPreparedStatement;
+
+ try {
+ insertPreparedStatement = connection.prepareStatement(sqlInsert);
+ insertPreparedStatement.setEscapeProcessing(true); // should be on by default
+ } catch (SQLException exception) {
+ throw new IOException("unable to create statement for: " + sqlInsert, exception);
+ }
+
+ String sqlUpdate =
+ updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null;
+ PreparedStatement updatePreparedStatement = null;
+
+ try {
+ updatePreparedStatement =
+ sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null;
+ } catch (SQLException exception) {
+ throw new IOException("unable to create statement for: " + sqlUpdate, exception);
+ }
+
+ return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements);
+ }
+
+ protected void configureConnection(Connection connection) {
+ setAutoCommit(connection);
+ }
+
+ protected void setAutoCommit(Connection connection) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (Exception exception) {
+ throw new RuntimeException("unable to set auto commit", exception);
+ }
+ }
+
+ /**
+ * Initializes the reduce-part of the job with the appropriate output settings
+ *
+ * @param job The job
+ * @param dbOutputFormatClass
+ * @param tableName The table to insert data into
+ * @param fieldNames The field names in the table. If unknown, supply the appropriate
+ */
+ public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass,
+ String tableName, String[] fieldNames, String[] updateFields, int batchSize) {
+ if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else {
+ job.setOutputFormat(dbOutputFormatClass);
+ }
+
+ // writing doesn't always happen in reduce
+ job.setReduceSpeculativeExecution(false);
+ job.setMapSpeculativeExecution(false);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setOutputTableName(tableName);
+ dbConf.setOutputFieldNames(fieldNames);
+
+ if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); }
+
+ if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); }
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java
new file mode 100644
index 0000000..1369c74
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.hadoop.io.Writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Objects that are read from/written to a database should implement <code>DBWritable</code>.
+ * DBWritable, is similar to {@link Writable} except that the {@link #write(PreparedStatement)}
+ * method takes a {@link PreparedStatement}, and {@link #readFields(ResultSet)} takes a {@link
+ * ResultSet}. <p> Implementations are responsible for writing the fields of the object to
+ * PreparedStatement, and reading the fields of the object from the ResultSet. <p/> <p>Example:</p>
+ * If we have the following table in the database :
+ * <pre>
+ * CREATE TABLE MyTable (
+ * counter INTEGER NOT NULL,
+ * timestamp BIGINT NOT NULL,
+ * );
+ * </pre>
+ * then we can read/write the tuples from/to the table with :
+ * <p><pre>
+ * public class MyWritable implements Writable, DBWritable {
+ * // Some data
+ * private int counter;
+ * private long timestamp;
+ * <p/>
+ * //Writable#write() implementation
+ * public void write(DataOutput out) throws IOException {
+ * out.writeInt(counter);
+ * out.writeLong(timestamp);
+ * }
+ * <p/>
+ * //Writable#readFields() implementation
+ * public void readFields(DataInput in) throws IOException {
+ * counter = in.readInt();
+ * timestamp = in.readLong();
+ * }
+ * <p/>
+ * public void write(PreparedStatement statement) throws SQLException {
+ * statement.setInt(1, counter);
+ * statement.setLong(2, timestamp);
+ * }
+ * <p/>
+ * public void readFields(ResultSet resultSet) throws SQLException {
+ * counter = resultSet.getInt(1);
+ * timestamp = resultSet.getLong(2);
+ * }
+ * }
+ * </pre></p>
+ */
+public interface DBWritable {
+
+ /**
+ * Sets the fields of the object in the {@link PreparedStatement}.
+ *
+ * @param statement the statement that the fields are put into.
+ * @throws SQLException
+ */
+ public void write(PreparedStatement statement) throws SQLException;
+
+ /**
+ * Reads the fields of the object from the {@link ResultSet}.
+ *
+ * @param resultSet the {@link ResultSet} to get the fields from.
+ * @throws SQLException
+ */
+ public void readFields(ResultSet resultSet) throws SQLException;
+
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
new file mode 100644
index 0000000..2a08b7d
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -0,0 +1,56 @@
+package parallelai.spyglass.jdbc
+
+import com.twitter.scalding.AccessMode
+import com.twitter.scalding.Hdfs
+import com.twitter.scalding.Mode
+import com.twitter.scalding.Read
+import com.twitter.scalding.Source
+import com.twitter.scalding.Write
+import cascading.scheme.Scheme
+import cascading.tap.Tap
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.RecordReader
+import org.apache.hadoop.mapred.OutputCollector
+import org.apache.hadoop.mapred.JobConf
+
+class JDBCSource(
+ tableName: String = "tableName",
+ driverName: String = "com.mysql.jdbc.Driver",
+ connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>",
+ userId: String = "user",
+ password: String = "password",
+ columnNames: Array[String] = Array[String]("col1", "col2", "col3"),
+ columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"),
+ primaryKeys: Array[String] = Array[String]("primary_key"),
+ fields: Fields = new Fields("fld1", "fld2", "fld3"),
+ orderBy: Array[String] = null,
+ updateBy: Array[String] = null,
+ updateByFields: Fields = null
+ ) extends Source {
+
+ override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val jdbcScheme = hdfsScheme match {
+ case jdbc: JDBCScheme => jdbc
+ case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
+ jdbcTap.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+
+ val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
+ jdbcTap.asInstanceOf[Tap[_,_,_]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
new file mode 100644
index 0000000..1544f47
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
@@ -0,0 +1,57 @@
+package parallelai.spyglass.jdbc.testing
+
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.Args
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.mathematics.Matrix._
+import scala.math._
+import scala.math.BigDecimal.javaBigDecimal2bigDecimal
+import cascading.tuple.Fields
+import cascading.pipe.Pipe
+import com.twitter.scalding.Osv
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+class HdfsToJdbc (args: Args) extends JobBase(args) {
+
+ implicit val implicitArgs: Args = args
+
+ val scaldingInputPath = getString("input.scalding")
+ log.info("Scalding sample input path => [" + scaldingInputPath + "]")
+
+ val S_output = scaldingInputPath
+ val fileType = getString("fileType")
+ log.info("Input file type => " + fileType)
+
+ val S_SCHEMA = List(
+ 'key_id, 'col1, 'col2, 'col3
+ )
+
+ val url = "mysql01.prod.bigdata.bskyb.com"
+ val dbName = "skybet_db"
+ val tableName = "skybet_hbase_betdetail_jdbc_test"
+
+
+ val jdbcSource2 = new JDBCSource(
+ "db_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
+ "user",
+ "password",
+ Array[String]("KEY_ID", "COL1", "COL2", "COL3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("key_id"),
+ new Fields("key_id", "col1", "col2", "col3")
+ )
+
+ var piper:Pipe = null
+ if (fileType equals("Tsv"))
+ piper = Tsv(S_output, S_SCHEMA).read
+ else
+ piper = Osv(S_output, S_SCHEMA).read
+
+ val S_FLOW =
+ Tsv(S_output, S_SCHEMA).read
+ .write(jdbcSource2)
+
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
new file mode 100644
index 0000000..30c03a2
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
@@ -0,0 +1,200 @@
+package parallelai.spyglass.jdbc.testing
+
+import org.apache.log4j.Level
+import org.apache.log4j.LogManager
+import org.apache.log4j.Logger
+import com.twitter.scalding.Args
+import com.twitter.scalding.IterableSource
+import com.twitter.scalding.Tsv
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+/**
+ * This integration-test expects some Jdbc table to exist
+ * with specific data - see GenerateTestingHTables.java
+ */
+
+// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala
+class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {
+
+ // Initiate logger
+ private val LOG: Logger = LogManager.getLogger(this.getClass)
+
+ // Set to Level.DEBUG if --debug is passed in
+ val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean
+ if (isDebug) {
+ LOG.setLevel(Level.DEBUG)
+ LOG.info("Setting logging to Level.DEBUG")
+ }
+
+ val url = "mysql01.prod.bigdata.bskyb.com"
+ val dbName = "skybet_db"
+ val tableName = "skybet_hbase_betdetail_jdbc_test"
+
+ val jdbcSourceRead = new JDBCSource(
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
+ )
+
+ val jdbcSourceWrite = new JDBCSource(
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
+ )
+
+ // -----------------------------
+ // ----- Tests for TABLE_01 ----
+ // -----------------------------
+ val TABLE_01_SCHEMA = List('key,'column1, 'column2, 'column3)
+ val tableName1 = "TABLE_01"
+
+ // -------------------- Test 01 --------------------
+ var testName01 = "Select_Test_Read_Count"
+ println("---- Running : " + testName01)
+ // Get everything from HBase testing table into a Pipe
+ val jdbc01 = jdbcSourceRead
+ .read
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ group.toList[String]('column2 -> 'column2)
+ group.toList[String]('column3 -> 'column3)
+ }
+ .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) =>
+ x._1 + " " + x._2 + " " + x._3 + " " + x._4
+ }
+
+ // Calculate expected result for Test 01
+ var list01 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345"))
+
+ // -------------------- Test 02 --------------------
+ val testName02 = "Select_Test_Read_Insert_Updated_Count"
+ println("---- Running : " + testName02)
+
+ // Get everything from JDBC testing table into a Pipe
+
+ val jdbcSourceReadUpdated = new JDBCSource(
+ "TABLE_02",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
+ )
+
+ val jdbc02 = jdbcSourceReadUpdated
+ .read
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ group.toList[String]('column2 -> 'column2)
+ group.toList[String]('column3 -> 'column3)
+ }
+ .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) =>
+ x._1 + " " + x._2 + " " + x._3 + " " + x._4
+ }
+
+ // Calculate expected result for Test 02
+ var list02 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345"))
+
+ // Store results of Scan Test 01
+ (
+ getTestResultPipe(getExpectedPipe(list01), jdbc01, testName01) ++
+ getTestResultPipe(getExpectedPipe(list02), jdbc02, testName02)
+ ).groupAll { group =>
+ group.sortBy('testName)
+ }
+ .write(Tsv("JdbcShouldRead"))
+
+
+ /**
+ * We assume the pipe is empty
+ *
+ * We concatenate with a header - if the resulting size is 1
+ * then the original size was 0 - then the pipe was empty :)
+ *
+ * The result is then returned in a Pipe
+ */
+ def assertPipeIsEmpty ( jdbcPipe : Pipe , testName:String) : Pipe = {
+ val headerPipe = IterableSource(List(testName), 'jdbcdata)
+ val concatenation = ( jdbcPipe ++ headerPipe ).groupAll{ group =>
+ group.size('size)
+ }
+ .project('size)
+
+ val result =
+ concatenation
+ .mapTo('size -> ('testName, 'result, 'expecteddata, 'jdbcdata)) { x:String => {
+ if (x == "1") {
+ (testName, "Success", "", "")
+ } else {
+ (testName, "Test Failed", "", "")
+ }
+ }
+ }
+
+ result
+ }
+
+ /**
+ * Methods receives 2 pipes - and projects the results of testing
+ *
+ * expectedPipe should have a column 'expecteddata
+ * realJdbcPipe should have a column 'jdbcdata
+ */
+ def getTestResultPipe ( expectedPipe:Pipe , realJdbcPipe:Pipe, testName: String ): Pipe = {
+ val results = expectedPipe.insert('testName , testName)
+ .joinWithTiny('testName -> 'testName, realJdbcPipe.insert('testName , testName))
+ .map(('expecteddata, 'jdbcdata)->'result) { x:(String,String) =>
+ //println(x._1 + " === " + x._2)
+ if (x._1.equals(x._2))
+ "Success"
+ else
+ "Test Failed"
+ }
+ .project('testName, 'result, 'expecteddata, 'jdbcdata)
+ results
+ }
+
+ /**
+ *
+ */
+ def getExpectedPipe ( expectedList: List[(String,String,String,String)]) : Pipe = {
+
+ val expectedPipe =
+ IterableSource(expectedList, TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ group.toList[String]('column2 -> 'column2)
+ group.toList[String]('column3 -> 'column3)
+
+ }
+ .mapTo(('*) -> 'expecteddata) { x:(String,String,String,String) =>
+ x._1 + " " + x._2 + " " + x._3 + " " + x._4
+ }
+ expectedPipe
+ }
+
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala
new file mode 100644
index 0000000..f317834
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala
@@ -0,0 +1,10 @@
+package parallelai.spyglass.jdbc.testing
+
+import parallelai.spyglass.base.JobRunner
+
+object JdbcSourceShouldReadWriteRunner extends App {
+ val appConfig = "/projects/applications.conf"
+ val libPath = "/*.jar"
+
+ JobRunner.main(Array(classOf[JdbcSourceShouldReadWrite].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath))
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
new file mode 100644
index 0000000..9fc09e4
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
@@ -0,0 +1,67 @@
+package parallelai.spyglass.jdbc.testing
+
+import com.twitter.scalding._
+import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+/**
+ * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match.
+ * Now hardcoded for Skybet betdetail summation sample.
+ * To run it:
+ * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \
+ * --app.conf.path /projects/application-hadoop.conf --hdfs \
+ * --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation
+ * @param args
+ */
+class TablesComparison(args: Args) extends JobBase(args) {
+
+ implicit val implicitArgs: Args = args
+ val conf = appConfig
+
+ val jdbcSink = new JDBCSource(
+ "table_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://<hostname>:<port>/<db_name>",
+ "skybet_user",
+ "zkb4Uo{C8",
+ Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"),
+ Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"),
+ Array[String]("betleg_id"),
+ new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"),
+ Array[String]("BETLEG_ID"),
+ Array[String]("BETLEG_ID"),
+ new Fields("betleg_id")
+ )
+ .read
+ .insert('name, "betdetail")
+ .project('bet_id, 'part_no, 'leg_id)
+
+ //
+ // .write(new TextLine("testJDBCComparator/compare1"))
+
+ val jdbcSource2 = new JDBCSource(
+ "skybet_midas_bet_detail",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull",
+ "skybet_user",
+ "zkb4Uo{C8",
+ Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"),
+ Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"),
+ Array[String]("bet_id"),
+ new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no")
+
+ )
+ .read
+ .insert('name, "sample")
+ .project('bet_id, 'part_no, 'leg_id)
+
+ val uPipe = jdbcSink ++ jdbcSource2
+ uPipe
+ .groupBy('bet_id, 'part_no, 'leg_id) {
+ _.size
+ }.filter('size) {
+ x: Int => x != 2
+ }
+ .write(new TextLine("testJDBCComparator/result"))
+} \ No newline at end of file
diff --git a/src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java b/src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java
new file mode 100644
index 0000000..54ec8fc
--- /dev/null
+++ b/src/test/java/parallelai/spyglass/jdbc/GenerateTestingTables.java
@@ -0,0 +1,201 @@
+package parallelai.spyglass.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+/**
+ * Class generates TWO tables in database 'TABLE_01' and 'TABLE_02'
+ *
+ * Those tables are used by the 'integration-testing' of JDBCSource in file
+ * JdbcSourceShouldReadWrite.scala
+ *
+ * Run with: mvn -Dtestparallelai.spyglass.jdbc.GenerateTestingTables test
+ *
+ */
+public class GenerateTestingTables {
+
+ // JDBC driver name and database URL
+ static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
+ static final String DB_PORT = "3306";
+ static final String DB_NAME = "database_name";
+
+
+ static final String DB_URL = "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull";
+
+ // Database credentials
+ static final String USER = "user";
+ static final String PASS = "password";
+
+ public static enum TestingTable {
+ TABLE_01, TABLE_02
+ }
+
+ private static final Log LOG = LogFactory
+ .getLog(GenerateTestingTables.class);
+
+ @Test
+ public void fakeTest() {
+
+ // Connect to Quorum
+ LOG.info("Connecting to " + DB_URL + ":" + DB_PORT);
+
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ // STEP 2: Register JDBC driver
+ Class.forName("com.mysql.jdbc.Driver");
+
+ // STEP 3: Open a connection
+ LOG.info("Connecting to a selected database...");
+ conn = DriverManager.getConnection(DB_URL, USER, PASS);
+ LOG.info("Connected database successfully...");
+
+
+ // Delete test tables
+ deleteTestTable(conn, TestingTable.TABLE_01.name());
+ deleteTestTable(conn, TestingTable.TABLE_02.name());
+
+ // Generate test tables
+ createTestTable(conn, TestingTable.TABLE_01);
+ createTestTable(conn, TestingTable.TABLE_02);
+
+ // Populate test tables
+ populateTestTable(conn, TestingTable.TABLE_01);
+ //populateTestTable(conn, TestingTable.TABLE_02);
+
+ // Print content of test table
+ printHTable(conn, TestingTable.TABLE_01);
+
+ // If we've reached here - the testing data are in
+ Assert.assertEquals("true", "true");
+
+
+ } catch (SQLException se) {
+ // Handle errors for JDBC
+ se.printStackTrace();
+ LOG.error(se.toString());
+ } catch (Exception e) {
+ // Handle errors for Class.forName
+ e.printStackTrace();
+ LOG.error(e.toString());
+ } finally {
+ // finally block used to close resources
+ try {
+ if (stmt != null)
+ conn.close();
+ } catch (SQLException se) {
+ }// do nothing
+ try {
+ if (conn != null)
+ conn.close();
+ } catch (SQLException se) {
+ se.printStackTrace();
+ LOG.error(se.toString());
+ }// end finally try
+ }// end try
+
+ }
+
+ private static void populateTestTable(Connection connection, TestingTable testingTable)
+ throws SQLException {
+
+
+ // Load up table
+ LOG.info("Populating table in given database...");
+ Statement stmt = connection.createStatement();
+
+
+ String [] queries = {
+ "insert into " + testingTable.name() + " values (1, 'A', 'X', 123)",
+ "insert into " + testingTable.name() + " values (2, 'B', 'Y', 234)",
+ "insert into " + testingTable.name() + " values (3, 'C', 'Z', 345)",
+ };
+
+ Statement statement = connection.createStatement();
+
+ for (String query : queries) {
+ statement.addBatch(query);
+ }
+ statement.executeBatch();
+ LOG.info("Populated table in given database...");
+
+ statement.close();
+
+ }
+
+ private static void createTestTable(Connection connection, TestingTable testingTable)
+ throws SQLException {
+
+ LOG.info("Creating table in given database...");
+ Statement stmt = connection.createStatement();
+
+ String sql = "CREATE TABLE " + testingTable.name() + " "
+ + "(id INTEGER not NULL, " + " test_column1 VARCHAR(255), "
+ + " test_column2 VARCHAR(255), " + " test_column3 INTEGER, "
+ + " PRIMARY KEY ( id ))";
+
+ stmt.executeUpdate(sql);
+ LOG.info("Created table in given database...");
+
+ stmt.close();
+ }
+
+ /**
+ * Method to disable and delete HBase Tables i.e. "int-test-01"
+ */
+ private static void deleteTestTable(Connection connection, String tableName) throws SQLException {
+
+
+ // Execute a query
+ LOG.info("Deleting table in given database...");
+ Statement stmt = connection.createStatement();
+
+ String sql = "DROP TABLE IF EXISTS " + tableName;
+
+ int result = stmt.executeUpdate(sql);
+ LOG.info("Deleted table in given database... " + result);
+
+
+ stmt.close();
+ }
+
+ /**
+ * Method to print-out an HTable
+ */
+ private static void printHTable(Connection connection, TestingTable testingTable)
+ throws SQLException {
+
+ // Execute a query
+ LOG.info("Printing table in given database...");
+ Statement stmt = connection.createStatement();
+
+ String sql = "SELECT * FROM " + testingTable.name();
+
+ ResultSet resultSet = stmt.executeQuery(sql);
+ LOG.info("Get data from table in given database...");
+
+ while (resultSet.next()) {
+ Integer key = resultSet.getInt("id");
+ String testColumn1 = resultSet.getString("test_column1");
+ String testColumn2 = resultSet.getString("test_column2");
+ Integer testColumn3 = resultSet.getInt("test_column3");
+
+ LOG.info(key + " : " + testColumn1 + " : " + testColumn2 + " : " + testColumn3);
+ }
+
+ }
+
+ public static void main(String[] args) {
+ GenerateTestingTables test = new GenerateTestingTables();
+ test.fakeTest();
+ }
+} \ No newline at end of file