From 20a18b4388f0cd06bec0b43d083150f6e1bb2c5e Mon Sep 17 00:00:00 2001 From: Gracia Fernandez Date: Thu, 4 Jul 2013 16:49:09 +0100 Subject: Changed HBaseSource and JDBCSource to allow testing with JobTest. Samples of tests included. --- .../parallelai/spyglass/jdbc/JDBCSource.scala | 23 ++++---- .../spyglass/jdbc/example/JdbcSourceExample.scala | 33 +++++++++++ .../spyglass/jdbc/testing/HdfsToJdbc.scala | 6 +- .../jdbc/testing/JdbcSourceShouldReadWrite.scala | 64 ++++++++++----------- .../spyglass/jdbc/testing/TablesComparison.scala | 67 ---------------------- 5 files changed, 81 insertions(+), 112 deletions(-) create mode 100644 src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala delete mode 100644 src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala (limited to 'src/main/scala/parallelai/spyglass/jdbc') diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 2a08b7d..beb66be 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -6,31 +6,34 @@ import com.twitter.scalding.Mode import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write -import cascading.scheme.Scheme +import cascading.scheme.{NullScheme, Scheme} import cascading.tap.Tap import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf -class JDBCSource( +case class JDBCSource( tableName: String = "tableName", driverName: String = "com.mysql.jdbc.Driver", connectionString: String = "jdbc:mysql://:/", userId: String = "user", password: String = "password", - columnNames: Array[String] = Array[String]("col1", "col2", "col3"), - columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"), - primaryKeys: Array[String] = Array[String]("primary_key"), + columnNames: List[String] = List("col1", "col2", "col3"), + columnDefs: List[String] = List("data_type", "data_type", "data_type"), + primaryKeys: List[String] = List("primary_key"), fields: Fields = new Fields("fld1", "fld2", "fld3"), - orderBy: Array[String] = null, - updateBy: Array[String] = null, + orderBy: List[String] = List(), + updateBy: List[String] = List(), updateByFields: Fields = null ) extends Source { - override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy) + override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray) .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + // To enable local mode testing + override def localScheme = new NullScheme(fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -39,13 +42,13 @@ class JDBCSource( mode match { case hdfsMode @ Hdfs(_, _) => readOrWrite match { case Read => { - val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) jdbcTap.asInstanceOf[Tap[_,_,_]] } case Write => { - val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) jdbcTap.asInstanceOf[Tap[_,_,_]] } diff --git a/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala b/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala new file mode 100644 index 0000000..0217204 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala @@ -0,0 +1,33 @@ +package parallelai.spyglass.jdbc.example + +import com.twitter.scalding.{Tsv, Args} +import parallelai.spyglass.base.JobBase +import org.apache.log4j.{Level, Logger} +import parallelai.spyglass.jdbc.JDBCSource +import cascading.tuple.Fields + +/** + * Simple example of JDBCSource usage + */ +class JdbcSourceExample(args: Args) extends JobBase(args) { + + val isDebug: Boolean = args("debug").toBoolean + + if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + + val output = args("output") + + val hbs2 = new JDBCSource( + "db_name", + "com.mysql.jdbc.Driver", + "jdbc:mysql://:/?zeroDateTimeBehavior=convertToNull", + "user", + "password", + List("KEY_ID", "COL1", "COL2", "COL3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("key_id"), + new Fields("key_id", "col1", "col2", "col3") + ).read + .write(Tsv(output.format("get_list"))) + +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala index 1544f47..d290f06 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala @@ -38,9 +38,9 @@ class HdfsToJdbc (args: Args) extends JobBase(args) { "jdbc:mysql://:/?zeroDateTimeBehavior=convertToNull", "user", "password", - Array[String]("KEY_ID", "COL1", "COL2", "COL3"), - Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), - Array[String]("key_id"), + List("KEY_ID", "COL1", "COL2", "COL3"), + List( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + List("key_id"), new Fields("key_id", "col1", "col2", "col3") ) diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala index 30c03a2..765b422 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala @@ -34,29 +34,29 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) { val tableName = "skybet_hbase_betdetail_jdbc_test" val jdbcSourceRead = new JDBCSource( - "TABLE_01", - "com.mysql.jdbc.Driver", - "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", - "root", - "password", - Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), - Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), - Array[String]("id"), - new Fields("key", "column1", "column2", "column3"), - null, null, null + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null ) - + val jdbcSourceWrite = new JDBCSource( - "TABLE_01", - "com.mysql.jdbc.Driver", - "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", - "root", - "password", - Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), - Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), - Array[String]("id"), - new Fields("key", "column1", "column2", "column3"), - null, null, null + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null ) // ----------------------------- @@ -89,18 +89,18 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) { println("---- Running : " + testName02) // Get everything from JDBC testing table into a Pipe - + val jdbcSourceReadUpdated = new JDBCSource( - "TABLE_02", - "com.mysql.jdbc.Driver", - "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", - "root", - "password", - Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), - Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), - Array[String]("id"), - new Fields("key", "column1", "column2", "column3"), - null, null, null + "TABLE_02", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null ) val jdbc02 = jdbcSourceReadUpdated diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala deleted file mode 100644 index 9fc09e4..0000000 --- a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala +++ /dev/null @@ -1,67 +0,0 @@ -package parallelai.spyglass.jdbc.testing - -import com.twitter.scalding._ -import cascading.tuple.Fields -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.jdbc.JDBCSource - -/** - * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match. - * Now hardcoded for Skybet betdetail summation sample. - * To run it: - * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \ - * --app.conf.path /projects/application-hadoop.conf --hdfs \ - * --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation - * @param args - */ -class TablesComparison(args: Args) extends JobBase(args) { - - implicit val implicitArgs: Args = args - val conf = appConfig - - val jdbcSink = new JDBCSource( - "table_name", - "com.mysql.jdbc.Driver", - "jdbc:mysql://:/", - "skybet_user", - "zkb4Uo{C8", - Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"), - Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"), - Array[String]("betleg_id"), - new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"), - Array[String]("BETLEG_ID"), - Array[String]("BETLEG_ID"), - new Fields("betleg_id") - ) - .read - .insert('name, "betdetail") - .project('bet_id, 'part_no, 'leg_id) - - // - // .write(new TextLine("testJDBCComparator/compare1")) - - val jdbcSource2 = new JDBCSource( - "skybet_midas_bet_detail", - "com.mysql.jdbc.Driver", - "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull", - "skybet_user", - "zkb4Uo{C8", - Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"), - Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"), - Array[String]("bet_id"), - new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no") - - ) - .read - .insert('name, "sample") - .project('bet_id, 'part_no, 'leg_id) - - val uPipe = jdbcSink ++ jdbcSource2 - uPipe - .groupBy('bet_id, 'part_no, 'leg_id) { - _.size - }.filter('size) { - x: Int => x != 2 - } - .write(new TextLine("testJDBCComparator/result")) -} \ No newline at end of file -- cgit v1.2.3