From 3e4a74de169104679f2459947c27d5cfb8cc430b Mon Sep 17 00:00:00 2001
From: Saad Rashid
Date: Mon, 24 Jun 2013 17:13:19 +0100
Subject: Added JDBCTap support.
---
.../parallelai/spyglass/jdbc/JDBCConstants.java | 25 +
.../java/parallelai/spyglass/jdbc/JDBCScheme.java | 670 +++++++++++++++++++++
.../java/parallelai/spyglass/jdbc/JDBCTap.java | 621 +++++++++++++++++++
.../parallelai/spyglass/jdbc/JDBCTapCollector.java | 118 ++++
.../java/parallelai/spyglass/jdbc/TableDesc.java | 181 ++++++
.../java/parallelai/spyglass/jdbc/TupleRecord.java | 63 ++
.../spyglass/jdbc/db/DBConfiguration.java | 276 +++++++++
.../parallelai/spyglass/jdbc/db/DBInputFormat.java | 452 ++++++++++++++
.../spyglass/jdbc/db/DBOutputFormat.java | 391 ++++++++++++
.../parallelai/spyglass/jdbc/db/DBWritable.java | 83 +++
.../parallelai/spyglass/jdbc/JDBCSource.scala | 56 ++
.../spyglass/jdbc/testing/HdfsToJdbc.scala | 57 ++
.../jdbc/testing/JdbcSourceShouldReadWrite.scala | 200 ++++++
.../testing/JdbcSourceShouldReadWriteRunner.scala | 10 +
.../spyglass/jdbc/testing/TablesComparison.scala | 67 +++
15 files changed, 3270 insertions(+)
create mode 100644 src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/JDBCTap.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/JDBCTapCollector.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/TableDesc.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/TupleRecord.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
create mode 100644 src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java
create mode 100644 src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala
create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
(limited to 'src/main')
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java
new file mode 100644
index 0000000..6788624
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/JDBCConstants.java
@@ -0,0 +1,25 @@
+package parallelai.spyglass.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class JDBCConstants {
+
+ public enum JdbcSourceMode {
+ SELECT,
+ SELECT_WITH_PARTITIONS,
+ SELECT_WITH_BUCKETS;
+ }
+
+ public enum JdbcSinkMode {
+ INSERT,
+ UPDATE,
+ UPSERT;
+ }
+
+ public static final String START_KEY = "jdbc.%s.startkey";
+ public static final String STOP_KEY = "jdbc.%s.stopkey";
+ public static final String SOURCE_MODE = "jdbc.%s.source.mode";
+ public static final String KEY_LIST = "jdbc.%s.key.list";
+ public static final String VERSIONS = "jdbc.%s.versions";
+
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java
new file mode 100644
index 0000000..5007895
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/JDBCScheme.java
@@ -0,0 +1,670 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import cascading.util.Util;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import parallelai.spyglass.jdbc.db.DBInputFormat;
+import parallelai.spyglass.jdbc.db.DBOutputFormat;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database.
+ *
+ * If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns
+ * for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated.
+ *
+ * Some constructors take columnFields and updateByFields. These values will be used during field name resolution
+ * to bind this Scheme to the source and sink branches in a give assembly. These fields 'alias' the column names
+ * in the respective arrays. In other words, if your DB TABLE has different column names than your assembly exepects,
+ * use the Fields arguments to bind the assembly to the table. Both Fields and array must be the same size.
+ *
+ * Override this class, {@link DBInputFormat}, and {@link DBOutputFormat} to specialize for a given vendor database.
+ */
+public class JDBCScheme extends Scheme
+{
+ private Class extends DBInputFormat> inputFormatClass;
+ private Class extends DBOutputFormat> outputFormatClass;
+ private String[] columns;
+ private String[] orderBy;
+ private String conditions;
+ private String[] updateBy;
+ private Fields updateValueFields;
+ private Fields updateByFields;
+ private Fields columnFields;
+ private Tuple updateIfTuple;
+ private String selectQuery;
+ private String countQuery;
+ private long limit = -1;
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param outputFormatClass of type Class extends DBOutputFormat>
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, long limit, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, new Fields( columns ), columns, orderBy, conditions, limit, updateBy != null ? new Fields( updateBy ) : null, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param outputFormatClass of type Class extends DBOutputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy )
+ {
+ this.columnFields = columnFields;
+
+ verifyColumns( columnFields, columns );
+
+ setSinkFields( columnFields );
+ setSourceFields( columnFields );
+
+ if( updateBy != null && updateBy.length != 0 )
+ {
+ this.updateBy = updateBy;
+ this.updateByFields = updateByFields;
+
+ if( updateByFields.size() != updateBy.length )
+ throw new IllegalArgumentException( "updateByFields and updateBy must be the same size" );
+
+ if( !this.columnFields.contains( this.updateByFields ) )
+ throw new IllegalArgumentException( "columnFields must contain updateByFields column names" );
+
+ this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields );
+ this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls
+ }
+
+ this.columns = columns;
+ this.orderBy = orderBy;
+ this.conditions = conditions;
+ this.limit = limit;
+
+ this.inputFormatClass = inputFormatClass;
+ this.outputFormatClass = outputFormatClass;
+ }
+
+ private void verifyColumns( Fields columnFields, String[] columns )
+ {
+ if( columnFields.size() != columns.length )
+ throw new IllegalArgumentException( "columnFields and columns must be the same size" );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param outputFormatClass of type Class extends DBOutputFormat>
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String conditions, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columns, orderBy, conditions, -1, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param outputFormatClass of type Class extends DBOutputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, Fields updateByFields, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, -1, updateByFields, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param outputFormatClass of type Class extends DBOutputFormat>
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columns, orderBy, null, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param outputFormatClass of type Class extends DBOutputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Class extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy )
+ {
+ this( inputFormatClass, outputFormatClass, columnFields, columns, orderBy, null, -1, updateByFields, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy )
+ {
+ this( null, null, columns, orderBy, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param updateByFields of type Fields
+ * @param updateBy of type String[]
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, Fields updateByFields, String[] updateBy )
+ {
+ this( null, null, columnFields, columns, orderBy, updateByFields, updateBy );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, String conditions, long limit )
+ {
+ this( null, null, columns, orderBy, conditions, limit, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit )
+ {
+ this( null, null, columnFields, columns, orderBy, conditions, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param conditions of type String
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, String conditions )
+ {
+ this( null, null, columns, orderBy, conditions, null );
+ }
+
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, String conditions )
+ {
+ this( null, null, columnFields, columns, orderBy, conditions, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String[] orderBy, long limit )
+ {
+ this( null, null, columns, orderBy, null, limit, null );
+ }
+
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy, long limit )
+ {
+ this( null, null, columnFields, columns, orderBy, null, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ */
+ public JDBCScheme( String[] columns, String[] orderBy )
+ {
+ this( null, null, columns, orderBy, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param orderBy of type String[]
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String[] orderBy )
+ {
+ this( null, null, columnFields, columns, orderBy, null, -1, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String conditions, long limit )
+ {
+ this( null, null, columns, null, conditions, limit, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param conditions of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String conditions, long limit )
+ {
+ this( null, null, columnFields, columns, null, conditions, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param conditions of type String
+ */
+ public JDBCScheme( String[] columns, String conditions )
+ {
+ this( null, null, columns, null, conditions, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param conditions of type String
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String conditions )
+ {
+ this( null, null, columnFields, columns, null, conditions, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, long limit )
+ {
+ this( null, null, columns, null, null, limit, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, long limit )
+ {
+ this( null, null, columnFields, columns, null, null, limit, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columns of type String[]
+ */
+ public JDBCScheme( String[] columns )
+ {
+ this( null, null, new Fields( columns ), columns, null, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ */
+ public JDBCScheme( Fields columnFields, String[] columns )
+ {
+ this( null, null, columnFields, columns, null, null, null );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * Use this constructor if the data source may only be used as a source.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this( inputFormatClass, new Fields( columns ), columns, selectQuery, countQuery, limit );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param inputFormatClass of type Class extends DBInputFormat>
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Class extends DBInputFormat> inputFormatClass, Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this.columnFields = columnFields;
+
+ verifyColumns( columnFields, columns );
+
+ setSourceFields( columnFields );
+
+ this.columns = columns;
+ this.selectQuery = selectQuery.trim().replaceAll( ";$", "" );
+ this.countQuery = countQuery.trim().replaceAll( ";$", "" );
+ this.limit = limit;
+
+ this.inputFormatClass = inputFormatClass;
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * Use this constructor if the data source may only be used as a source.
+ *
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this( null, new Fields( columns ), columns, selectQuery, countQuery, limit );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ * @param limit of type long
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery, long limit )
+ {
+ this( null, columnFields, columns, selectQuery, countQuery, limit );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * Use this constructor if the data source may only be used as a source.
+ *
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ */
+ public JDBCScheme( String[] columns, String selectQuery, String countQuery )
+ {
+ this( null, new Fields( columns ), columns, selectQuery, countQuery, -1 );
+ }
+
+ /**
+ * Constructor JDBCScheme creates a new JDBCScheme instance.
+ *
+ * @param columnFields of type Fields
+ * @param columns of type String[]
+ * @param selectQuery of type String
+ * @param countQuery of type String
+ */
+ public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, String countQuery )
+ {
+ this( null, columnFields, columns, selectQuery, countQuery, -1 );
+ }
+
+ /**
+ * Method getColumns returns the columns of this JDBCScheme object.
+ *
+ * @return the columns (type String[]) of this JDBCScheme object.
+ */
+ public String[] getColumns() {
+ return columns;
+ }
+
+ /**
+ * Method getOrderBy returns the orderBy of this JDBCScheme object.
+ *
+ * @return the orderBy (type String[]) of this JDBCScheme object.
+ */
+ public String[] getOrderBy() {
+ return orderBy;
+ }
+
+ @Override
+ public void sourceConfInit( FlowProcess process, Tap tap,
+ JobConf conf ) {
+ int concurrentReads = ( (JDBCTap) tap ).concurrentReads;
+
+ if( selectQuery != null )
+ DBInputFormat.setInput( conf, TupleRecord.class, selectQuery, countQuery, limit, concurrentReads );
+ else {
+ String tableName = ( (JDBCTap) tap ).getTableName();
+ String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null;
+ DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, columns );
+ }
+
+ if( inputFormatClass != null )
+ conf.setInputFormat( inputFormatClass );
+ }
+
+ @Override
+ public void sinkConfInit( FlowProcess process, Tap tap,
+ JobConf conf ) {
+ if( selectQuery != null )
+ throw new TapException( "cannot sink to this Scheme" );
+
+ String tableName = ( (JDBCTap) tap ).getTableName();
+ int batchSize = ( (JDBCTap) tap ).getBatchSize();
+ DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy, batchSize );
+
+ if( outputFormatClass != null )
+ conf.setOutputFormat( outputFormatClass );
+ }
+
+ @Override
+ public void sourcePrepare( FlowProcess flowProcess, SourceCall
+ */
+public interface DBWritable {
+
+ /**
+ * Sets the fields of the object in the {@link PreparedStatement}.
+ *
+ * @param statement the statement that the fields are put into.
+ * @throws SQLException
+ */
+ public void write(PreparedStatement statement) throws SQLException;
+
+ /**
+ * Reads the fields of the object from the {@link ResultSet}.
+ *
+ * @param resultSet the {@link ResultSet} to get the fields from.
+ * @throws SQLException
+ */
+ public void readFields(ResultSet resultSet) throws SQLException;
+
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
new file mode 100644
index 0000000..2a08b7d
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -0,0 +1,56 @@
+package parallelai.spyglass.jdbc
+
+import com.twitter.scalding.AccessMode
+import com.twitter.scalding.Hdfs
+import com.twitter.scalding.Mode
+import com.twitter.scalding.Read
+import com.twitter.scalding.Source
+import com.twitter.scalding.Write
+import cascading.scheme.Scheme
+import cascading.tap.Tap
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.RecordReader
+import org.apache.hadoop.mapred.OutputCollector
+import org.apache.hadoop.mapred.JobConf
+
+class JDBCSource(
+ tableName: String = "tableName",
+ driverName: String = "com.mysql.jdbc.Driver",
+ connectionString: String = "jdbc:mysql://:/",
+ userId: String = "user",
+ password: String = "password",
+ columnNames: Array[String] = Array[String]("col1", "col2", "col3"),
+ columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"),
+ primaryKeys: Array[String] = Array[String]("primary_key"),
+ fields: Fields = new Fields("fld1", "fld2", "fld3"),
+ orderBy: Array[String] = null,
+ updateBy: Array[String] = null,
+ updateByFields: Fields = null
+ ) extends Source {
+
+ override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val jdbcScheme = hdfsScheme match {
+ case jdbc: JDBCScheme => jdbc
+ case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
+ jdbcTap.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+
+ val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
+ jdbcTap.asInstanceOf[Tap[_,_,_]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
new file mode 100644
index 0000000..1544f47
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
@@ -0,0 +1,57 @@
+package parallelai.spyglass.jdbc.testing
+
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.Args
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.mathematics.Matrix._
+import scala.math._
+import scala.math.BigDecimal.javaBigDecimal2bigDecimal
+import cascading.tuple.Fields
+import cascading.pipe.Pipe
+import com.twitter.scalding.Osv
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+class HdfsToJdbc (args: Args) extends JobBase(args) {
+
+ implicit val implicitArgs: Args = args
+
+ val scaldingInputPath = getString("input.scalding")
+ log.info("Scalding sample input path => [" + scaldingInputPath + "]")
+
+ val S_output = scaldingInputPath
+ val fileType = getString("fileType")
+ log.info("Input file type => " + fileType)
+
+ val S_SCHEMA = List(
+ 'key_id, 'col1, 'col2, 'col3
+ )
+
+ val url = "mysql01.prod.bigdata.bskyb.com"
+ val dbName = "skybet_db"
+ val tableName = "skybet_hbase_betdetail_jdbc_test"
+
+
+ val jdbcSource2 = new JDBCSource(
+ "db_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://:/?zeroDateTimeBehavior=convertToNull",
+ "user",
+ "password",
+ Array[String]("KEY_ID", "COL1", "COL2", "COL3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("key_id"),
+ new Fields("key_id", "col1", "col2", "col3")
+ )
+
+ var piper:Pipe = null
+ if (fileType equals("Tsv"))
+ piper = Tsv(S_output, S_SCHEMA).read
+ else
+ piper = Osv(S_output, S_SCHEMA).read
+
+ val S_FLOW =
+ Tsv(S_output, S_SCHEMA).read
+ .write(jdbcSource2)
+
+}
\ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
new file mode 100644
index 0000000..30c03a2
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
@@ -0,0 +1,200 @@
+package parallelai.spyglass.jdbc.testing
+
+import org.apache.log4j.Level
+import org.apache.log4j.LogManager
+import org.apache.log4j.Logger
+import com.twitter.scalding.Args
+import com.twitter.scalding.IterableSource
+import com.twitter.scalding.Tsv
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+/**
+ * This integration-test expects some Jdbc table to exist
+ * with specific data - see GenerateTestingHTables.java
+ */
+
+// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala
+class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {
+
+ // Initiate logger
+ private val LOG: Logger = LogManager.getLogger(this.getClass)
+
+ // Set to Level.DEBUG if --debug is passed in
+ val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean
+ if (isDebug) {
+ LOG.setLevel(Level.DEBUG)
+ LOG.info("Setting logging to Level.DEBUG")
+ }
+
+ val url = "mysql01.prod.bigdata.bskyb.com"
+ val dbName = "skybet_db"
+ val tableName = "skybet_hbase_betdetail_jdbc_test"
+
+ val jdbcSourceRead = new JDBCSource(
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
+ )
+
+ val jdbcSourceWrite = new JDBCSource(
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
+ )
+
+ // -----------------------------
+ // ----- Tests for TABLE_01 ----
+ // -----------------------------
+ val TABLE_01_SCHEMA = List('key,'column1, 'column2, 'column3)
+ val tableName1 = "TABLE_01"
+
+ // -------------------- Test 01 --------------------
+ var testName01 = "Select_Test_Read_Count"
+ println("---- Running : " + testName01)
+ // Get everything from HBase testing table into a Pipe
+ val jdbc01 = jdbcSourceRead
+ .read
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ group.toList[String]('column2 -> 'column2)
+ group.toList[String]('column3 -> 'column3)
+ }
+ .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) =>
+ x._1 + " " + x._2 + " " + x._3 + " " + x._4
+ }
+
+ // Calculate expected result for Test 01
+ var list01 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345"))
+
+ // -------------------- Test 02 --------------------
+ val testName02 = "Select_Test_Read_Insert_Updated_Count"
+ println("---- Running : " + testName02)
+
+ // Get everything from JDBC testing table into a Pipe
+
+ val jdbcSourceReadUpdated = new JDBCSource(
+ "TABLE_02",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
+ )
+
+ val jdbc02 = jdbcSourceReadUpdated
+ .read
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ group.toList[String]('column2 -> 'column2)
+ group.toList[String]('column3 -> 'column3)
+ }
+ .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) =>
+ x._1 + " " + x._2 + " " + x._3 + " " + x._4
+ }
+
+ // Calculate expected result for Test 02
+ var list02 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345"))
+
+ // Store results of Scan Test 01
+ (
+ getTestResultPipe(getExpectedPipe(list01), jdbc01, testName01) ++
+ getTestResultPipe(getExpectedPipe(list02), jdbc02, testName02)
+ ).groupAll { group =>
+ group.sortBy('testName)
+ }
+ .write(Tsv("JdbcShouldRead"))
+
+
+ /**
+ * We assume the pipe is empty
+ *
+ * We concatenate with a header - if the resulting size is 1
+ * then the original size was 0 - then the pipe was empty :)
+ *
+ * The result is then returned in a Pipe
+ */
+ def assertPipeIsEmpty ( jdbcPipe : Pipe , testName:String) : Pipe = {
+ val headerPipe = IterableSource(List(testName), 'jdbcdata)
+ val concatenation = ( jdbcPipe ++ headerPipe ).groupAll{ group =>
+ group.size('size)
+ }
+ .project('size)
+
+ val result =
+ concatenation
+ .mapTo('size -> ('testName, 'result, 'expecteddata, 'jdbcdata)) { x:String => {
+ if (x == "1") {
+ (testName, "Success", "", "")
+ } else {
+ (testName, "Test Failed", "", "")
+ }
+ }
+ }
+
+ result
+ }
+
+ /**
+ * Methods receives 2 pipes - and projects the results of testing
+ *
+ * expectedPipe should have a column 'expecteddata
+ * realJdbcPipe should have a column 'jdbcdata
+ */
+ def getTestResultPipe ( expectedPipe:Pipe , realJdbcPipe:Pipe, testName: String ): Pipe = {
+ val results = expectedPipe.insert('testName , testName)
+ .joinWithTiny('testName -> 'testName, realJdbcPipe.insert('testName , testName))
+ .map(('expecteddata, 'jdbcdata)->'result) { x:(String,String) =>
+ //println(x._1 + " === " + x._2)
+ if (x._1.equals(x._2))
+ "Success"
+ else
+ "Test Failed"
+ }
+ .project('testName, 'result, 'expecteddata, 'jdbcdata)
+ results
+ }
+
+ /**
+ *
+ */
+ def getExpectedPipe ( expectedList: List[(String,String,String,String)]) : Pipe = {
+
+ val expectedPipe =
+ IterableSource(expectedList, TABLE_01_SCHEMA)
+ .groupAll { group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ group.toList[String]('column2 -> 'column2)
+ group.toList[String]('column3 -> 'column3)
+
+ }
+ .mapTo(('*) -> 'expecteddata) { x:(String,String,String,String) =>
+ x._1 + " " + x._2 + " " + x._3 + " " + x._4
+ }
+ expectedPipe
+ }
+
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala
new file mode 100644
index 0000000..f317834
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala
@@ -0,0 +1,10 @@
+package parallelai.spyglass.jdbc.testing
+
+import parallelai.spyglass.base.JobRunner
+
+object JdbcSourceShouldReadWriteRunner extends App {
+ val appConfig = "/projects/applications.conf"
+ val libPath = "/*.jar"
+
+ JobRunner.main(Array(classOf[JdbcSourceShouldReadWrite].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath))
+}
\ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
new file mode 100644
index 0000000..9fc09e4
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
@@ -0,0 +1,67 @@
+package parallelai.spyglass.jdbc.testing
+
+import com.twitter.scalding._
+import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+/**
+ * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match.
+ * Now hardcoded for Skybet betdetail summation sample.
+ * To run it:
+ * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \
+ * --app.conf.path /projects/application-hadoop.conf --hdfs \
+ * --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation
+ * @param args
+ */
+class TablesComparison(args: Args) extends JobBase(args) {
+
+ implicit val implicitArgs: Args = args
+ val conf = appConfig
+
+ val jdbcSink = new JDBCSource(
+ "table_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://:/",
+ "skybet_user",
+ "zkb4Uo{C8",
+ Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"),
+ Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"),
+ Array[String]("betleg_id"),
+ new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"),
+ Array[String]("BETLEG_ID"),
+ Array[String]("BETLEG_ID"),
+ new Fields("betleg_id")
+ )
+ .read
+ .insert('name, "betdetail")
+ .project('bet_id, 'part_no, 'leg_id)
+
+ //
+ // .write(new TextLine("testJDBCComparator/compare1"))
+
+ val jdbcSource2 = new JDBCSource(
+ "skybet_midas_bet_detail",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull",
+ "skybet_user",
+ "zkb4Uo{C8",
+ Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"),
+ Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"),
+ Array[String]("bet_id"),
+ new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no")
+
+ )
+ .read
+ .insert('name, "sample")
+ .project('bet_id, 'part_no, 'leg_id)
+
+ val uPipe = jdbcSink ++ jdbcSource2
+ uPipe
+ .groupBy('bet_id, 'part_no, 'leg_id) {
+ _.size
+ }.filter('size) {
+ x: Int => x != 2
+ }
+ .write(new TextLine("testJDBCComparator/result"))
+}
\ No newline at end of file
--
cgit v1.2.3