aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc/testing
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/testing')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala6
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala64
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala67
3 files changed, 35 insertions, 102 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
index 1544f47..d290f06 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
@@ -38,9 +38,9 @@ class HdfsToJdbc (args: Args) extends JobBase(args) {
"jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
"user",
"password",
- Array[String]("KEY_ID", "COL1", "COL2", "COL3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("key_id"),
+ List("KEY_ID", "COL1", "COL2", "COL3"),
+ List( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ List("key_id"),
new Fields("key_id", "col1", "col2", "col3")
)
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
index 30c03a2..765b422 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
@@ -34,29 +34,29 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {
val tableName = "skybet_hbase_betdetail_jdbc_test"
val jdbcSourceRead = new JDBCSource(
- "TABLE_01",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
)
-
+
val jdbcSourceWrite = new JDBCSource(
- "TABLE_01",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
)
// -----------------------------
@@ -89,18 +89,18 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {
println("---- Running : " + testName02)
// Get everything from JDBC testing table into a Pipe
-
+
val jdbcSourceReadUpdated = new JDBCSource(
- "TABLE_02",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
+ "TABLE_02",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
)
val jdbc02 = jdbcSourceReadUpdated
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
deleted file mode 100644
index 9fc09e4..0000000
--- a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package parallelai.spyglass.jdbc.testing
-
-import com.twitter.scalding._
-import cascading.tuple.Fields
-import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.jdbc.JDBCSource
-
-/**
- * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match.
- * Now hardcoded for Skybet betdetail summation sample.
- * To run it:
- * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \
- * --app.conf.path /projects/application-hadoop.conf --hdfs \
- * --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation
- * @param args
- */
-class TablesComparison(args: Args) extends JobBase(args) {
-
- implicit val implicitArgs: Args = args
- val conf = appConfig
-
- val jdbcSink = new JDBCSource(
- "table_name",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://<hostname>:<port>/<db_name>",
- "skybet_user",
- "zkb4Uo{C8",
- Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"),
- Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"),
- Array[String]("betleg_id"),
- new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"),
- Array[String]("BETLEG_ID"),
- Array[String]("BETLEG_ID"),
- new Fields("betleg_id")
- )
- .read
- .insert('name, "betdetail")
- .project('bet_id, 'part_no, 'leg_id)
-
- //
- // .write(new TextLine("testJDBCComparator/compare1"))
-
- val jdbcSource2 = new JDBCSource(
- "skybet_midas_bet_detail",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull",
- "skybet_user",
- "zkb4Uo{C8",
- Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"),
- Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"),
- Array[String]("bet_id"),
- new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no")
-
- )
- .read
- .insert('name, "sample")
- .project('bet_id, 'part_no, 'leg_id)
-
- val uPipe = jdbcSink ++ jdbcSource2
- uPipe
- .groupBy('bet_id, 'part_no, 'leg_id) {
- _.size
- }.filter('size) {
- x: Int => x != 2
- }
- .write(new TextLine("testJDBCComparator/result"))
-} \ No newline at end of file