From 3e4a74de169104679f2459947c27d5cfb8cc430b Mon Sep 17 00:00:00 2001 From: Saad Rashid Date: Mon, 24 Jun 2013 17:13:19 +0100 Subject: Added JDBCTap support. --- .../parallelai/spyglass/jdbc/JDBCSource.scala | 56 ++++++ .../spyglass/jdbc/testing/HdfsToJdbc.scala | 57 ++++++ .../jdbc/testing/JdbcSourceShouldReadWrite.scala | 200 +++++++++++++++++++++ .../testing/JdbcSourceShouldReadWriteRunner.scala | 10 ++ .../spyglass/jdbc/testing/TablesComparison.scala | 67 +++++++ 5 files changed, 390 insertions(+) create mode 100644 src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala create mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala (limited to 'src/main/scala') diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala new file mode 100644 index 0000000..2a08b7d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -0,0 +1,56 @@ +package parallelai.spyglass.jdbc + +import com.twitter.scalding.AccessMode +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Mode +import com.twitter.scalding.Read +import com.twitter.scalding.Source +import com.twitter.scalding.Write +import cascading.scheme.Scheme +import cascading.tap.Tap +import cascading.tuple.Fields +import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.mapred.OutputCollector +import org.apache.hadoop.mapred.JobConf + +class JDBCSource( + tableName: String = "tableName", + driverName: String = "com.mysql.jdbc.Driver", + connectionString: String = "jdbc:mysql://:/", + userId: String = "user", + password: String = "password", + columnNames: Array[String] = Array[String]("col1", "col2", "col3"), + columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"), + primaryKeys: Array[String] = Array[String]("primary_key"), + fields: Fields = new Fields("fld1", "fld2", "fld3"), + orderBy: Array[String] = null, + updateBy: Array[String] = null, + updateByFields: Fields = null + ) extends Source { + + override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val jdbcScheme = hdfsScheme match { + case jdbc: JDBCScheme => jdbc + case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) + jdbcTap.asInstanceOf[Tap[_,_,_]] + } + case Write => { + + val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) + jdbcTap.asInstanceOf[Tap[_,_,_]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala new file mode 100644 index 0000000..1544f47 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala @@ -0,0 +1,57 @@ +package parallelai.spyglass.jdbc.testing + +import com.twitter.scalding.TextLine +import com.twitter.scalding.Args +import com.twitter.scalding.Tsv +import com.twitter.scalding.mathematics.Matrix._ +import scala.math._ +import scala.math.BigDecimal.javaBigDecimal2bigDecimal +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding.Osv +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +class HdfsToJdbc (args: Args) extends JobBase(args) { + + implicit val implicitArgs: Args = args + + val scaldingInputPath = getString("input.scalding") + log.info("Scalding sample input path => [" + scaldingInputPath + "]") + + val S_output = scaldingInputPath + val fileType = getString("fileType") + log.info("Input file type => " + fileType) + + val S_SCHEMA = List( + 'key_id, 'col1, 'col2, 'col3 + ) + + val url = "mysql01.prod.bigdata.bskyb.com" + val dbName = "skybet_db" + val tableName = "skybet_hbase_betdetail_jdbc_test" + + + val jdbcSource2 = new JDBCSource( + "db_name", + "com.mysql.jdbc.Driver", + "jdbc:mysql://:/?zeroDateTimeBehavior=convertToNull", + "user", + "password", + Array[String]("KEY_ID", "COL1", "COL2", "COL3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("key_id"), + new Fields("key_id", "col1", "col2", "col3") + ) + + var piper:Pipe = null + if (fileType equals("Tsv")) + piper = Tsv(S_output, S_SCHEMA).read + else + piper = Osv(S_output, S_SCHEMA).read + + val S_FLOW = + Tsv(S_output, S_SCHEMA).read + .write(jdbcSource2) + +} \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala new file mode 100644 index 0000000..30c03a2 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala @@ -0,0 +1,200 @@ +package parallelai.spyglass.jdbc.testing + +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.log4j.Logger +import com.twitter.scalding.Args +import com.twitter.scalding.IterableSource +import com.twitter.scalding.Tsv +import cascading.pipe.Pipe +import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +/** + * This integration-test expects some Jdbc table to exist + * with specific data - see GenerateTestingHTables.java + */ + +// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala +class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) { + + // Initiate logger + private val LOG: Logger = LogManager.getLogger(this.getClass) + + // Set to Level.DEBUG if --debug is passed in + val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean + if (isDebug) { + LOG.setLevel(Level.DEBUG) + LOG.info("Setting logging to Level.DEBUG") + } + + val url = "mysql01.prod.bigdata.bskyb.com" + val dbName = "skybet_db" + val tableName = "skybet_hbase_betdetail_jdbc_test" + + val jdbcSourceRead = new JDBCSource( + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null + ) + + val jdbcSourceWrite = new JDBCSource( + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null + ) + + // ----------------------------- + // ----- Tests for TABLE_01 ---- + // ----------------------------- + val TABLE_01_SCHEMA = List('key,'column1, 'column2, 'column3) + val tableName1 = "TABLE_01" + + // -------------------- Test 01 -------------------- + var testName01 = "Select_Test_Read_Count" + println("---- Running : " + testName01) + // Get everything from HBase testing table into a Pipe + val jdbc01 = jdbcSourceRead + .read + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + group.toList[String]('column2 -> 'column2) + group.toList[String]('column3 -> 'column3) + } + .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) => + x._1 + " " + x._2 + " " + x._3 + " " + x._4 + } + + // Calculate expected result for Test 01 + var list01 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345")) + + // -------------------- Test 02 -------------------- + val testName02 = "Select_Test_Read_Insert_Updated_Count" + println("---- Running : " + testName02) + + // Get everything from JDBC testing table into a Pipe + + val jdbcSourceReadUpdated = new JDBCSource( + "TABLE_02", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null + ) + + val jdbc02 = jdbcSourceReadUpdated + .read + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + group.toList[String]('column2 -> 'column2) + group.toList[String]('column3 -> 'column3) + } + .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) => + x._1 + " " + x._2 + " " + x._3 + " " + x._4 + } + + // Calculate expected result for Test 02 + var list02 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345")) + + // Store results of Scan Test 01 + ( + getTestResultPipe(getExpectedPipe(list01), jdbc01, testName01) ++ + getTestResultPipe(getExpectedPipe(list02), jdbc02, testName02) + ).groupAll { group => + group.sortBy('testName) + } + .write(Tsv("JdbcShouldRead")) + + + /** + * We assume the pipe is empty + * + * We concatenate with a header - if the resulting size is 1 + * then the original size was 0 - then the pipe was empty :) + * + * The result is then returned in a Pipe + */ + def assertPipeIsEmpty ( jdbcPipe : Pipe , testName:String) : Pipe = { + val headerPipe = IterableSource(List(testName), 'jdbcdata) + val concatenation = ( jdbcPipe ++ headerPipe ).groupAll{ group => + group.size('size) + } + .project('size) + + val result = + concatenation + .mapTo('size -> ('testName, 'result, 'expecteddata, 'jdbcdata)) { x:String => { + if (x == "1") { + (testName, "Success", "", "") + } else { + (testName, "Test Failed", "", "") + } + } + } + + result + } + + /** + * Methods receives 2 pipes - and projects the results of testing + * + * expectedPipe should have a column 'expecteddata + * realJdbcPipe should have a column 'jdbcdata + */ + def getTestResultPipe ( expectedPipe:Pipe , realJdbcPipe:Pipe, testName: String ): Pipe = { + val results = expectedPipe.insert('testName , testName) + .joinWithTiny('testName -> 'testName, realJdbcPipe.insert('testName , testName)) + .map(('expecteddata, 'jdbcdata)->'result) { x:(String,String) => + //println(x._1 + " === " + x._2) + if (x._1.equals(x._2)) + "Success" + else + "Test Failed" + } + .project('testName, 'result, 'expecteddata, 'jdbcdata) + results + } + + /** + * + */ + def getExpectedPipe ( expectedList: List[(String,String,String,String)]) : Pipe = { + + val expectedPipe = + IterableSource(expectedList, TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + group.toList[String]('column2 -> 'column2) + group.toList[String]('column3 -> 'column3) + + } + .mapTo(('*) -> 'expecteddata) { x:(String,String,String,String) => + x._1 + " " + x._2 + " " + x._3 + " " + x._4 + } + expectedPipe + } + +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala new file mode 100644 index 0000000..f317834 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWriteRunner.scala @@ -0,0 +1,10 @@ +package parallelai.spyglass.jdbc.testing + +import parallelai.spyglass.base.JobRunner + +object JdbcSourceShouldReadWriteRunner extends App { + val appConfig = "/projects/applications.conf" + val libPath = "/*.jar" + + JobRunner.main(Array(classOf[JdbcSourceShouldReadWrite].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath)) +} \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala new file mode 100644 index 0000000..9fc09e4 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala @@ -0,0 +1,67 @@ +package parallelai.spyglass.jdbc.testing + +import com.twitter.scalding._ +import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +/** + * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match. + * Now hardcoded for Skybet betdetail summation sample. + * To run it: + * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \ + * --app.conf.path /projects/application-hadoop.conf --hdfs \ + * --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation + * @param args + */ +class TablesComparison(args: Args) extends JobBase(args) { + + implicit val implicitArgs: Args = args + val conf = appConfig + + val jdbcSink = new JDBCSource( + "table_name", + "com.mysql.jdbc.Driver", + "jdbc:mysql://:/", + "skybet_user", + "zkb4Uo{C8", + Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"), + Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"), + Array[String]("betleg_id"), + new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"), + Array[String]("BETLEG_ID"), + Array[String]("BETLEG_ID"), + new Fields("betleg_id") + ) + .read + .insert('name, "betdetail") + .project('bet_id, 'part_no, 'leg_id) + + // + // .write(new TextLine("testJDBCComparator/compare1")) + + val jdbcSource2 = new JDBCSource( + "skybet_midas_bet_detail", + "com.mysql.jdbc.Driver", + "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull", + "skybet_user", + "zkb4Uo{C8", + Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"), + Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"), + Array[String]("bet_id"), + new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no") + + ) + .read + .insert('name, "sample") + .project('bet_id, 'part_no, 'leg_id) + + val uPipe = jdbcSink ++ jdbcSource2 + uPipe + .groupBy('bet_id, 'part_no, 'leg_id) { + _.size + }.filter('size) { + x: Int => x != 2 + } + .write(new TextLine("testJDBCComparator/result")) +} \ No newline at end of file -- cgit v1.2.3