diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala new file mode 100644 index 0000000..30c03a2 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala @@ -0,0 +1,200 @@ +package parallelai.spyglass.jdbc.testing + +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.log4j.Logger +import com.twitter.scalding.Args +import com.twitter.scalding.IterableSource +import com.twitter.scalding.Tsv +import cascading.pipe.Pipe +import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +/** + * This integration-test expects some Jdbc table to exist + * with specific data - see GenerateTestingHTables.java + */ + +// https://github.com/twitter/scalding/blob/develop/scalding-core/src/test/scala/com/twitter/scalding/BlockJoinTest.scala +class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) { + + // Initiate logger + private val LOG: Logger = LogManager.getLogger(this.getClass) + + // Set to Level.DEBUG if --debug is passed in + val isDebug:Boolean = args.getOrElse("debug", "false").toBoolean + if (isDebug) { + LOG.setLevel(Level.DEBUG) + LOG.info("Setting logging to Level.DEBUG") + } + + val url = "mysql01.prod.bigdata.bskyb.com" + val dbName = "skybet_db" + val tableName = "skybet_hbase_betdetail_jdbc_test" + + val jdbcSourceRead = new JDBCSource( + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null + ) + + val jdbcSourceWrite = new JDBCSource( + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null + ) + + // ----------------------------- + // ----- Tests for TABLE_01 ---- + // ----------------------------- + val TABLE_01_SCHEMA = List('key,'column1, 'column2, 'column3) + val tableName1 = "TABLE_01" + + // -------------------- Test 01 -------------------- + var testName01 = "Select_Test_Read_Count" + println("---- Running : " + testName01) + // Get everything from HBase testing table into a Pipe + val jdbc01 = jdbcSourceRead + .read + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + group.toList[String]('column2 -> 'column2) + group.toList[String]('column3 -> 'column3) + } + .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) => + x._1 + " " + x._2 + " " + x._3 + " " + x._4 + } + + // Calculate expected result for Test 01 + var list01 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345")) + + // -------------------- Test 02 -------------------- + val testName02 = "Select_Test_Read_Insert_Updated_Count" + println("---- Running : " + testName02) + + // Get everything from JDBC testing table into a Pipe + + val jdbcSourceReadUpdated = new JDBCSource( + "TABLE_02", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null + ) + + val jdbc02 = jdbcSourceReadUpdated + .read + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + group.toList[String]('column2 -> 'column2) + group.toList[String]('column3 -> 'column3) + } + .mapTo(('key, 'column1, 'column2, 'column3) -> 'jdbcdata) { x:(String,String,String,String) => + x._1 + " " + x._2 + " " + x._3 + " " + x._4 + } + + // Calculate expected result for Test 02 + var list02 = List(("1", "A", "X", "123"), ("2", "B", "Y", "234"), ("3", "C", "Z", "345")) + + // Store results of Scan Test 01 + ( + getTestResultPipe(getExpectedPipe(list01), jdbc01, testName01) ++ + getTestResultPipe(getExpectedPipe(list02), jdbc02, testName02) + ).groupAll { group => + group.sortBy('testName) + } + .write(Tsv("JdbcShouldRead")) + + + /** + * We assume the pipe is empty + * + * We concatenate with a header - if the resulting size is 1 + * then the original size was 0 - then the pipe was empty :) + * + * The result is then returned in a Pipe + */ + def assertPipeIsEmpty ( jdbcPipe : Pipe , testName:String) : Pipe = { + val headerPipe = IterableSource(List(testName), 'jdbcdata) + val concatenation = ( jdbcPipe ++ headerPipe ).groupAll{ group => + group.size('size) + } + .project('size) + + val result = + concatenation + .mapTo('size -> ('testName, 'result, 'expecteddata, 'jdbcdata)) { x:String => { + if (x == "1") { + (testName, "Success", "", "") + } else { + (testName, "Test Failed", "", "") + } + } + } + + result + } + + /** + * Methods receives 2 pipes - and projects the results of testing + * + * expectedPipe should have a column 'expecteddata + * realJdbcPipe should have a column 'jdbcdata + */ + def getTestResultPipe ( expectedPipe:Pipe , realJdbcPipe:Pipe, testName: String ): Pipe = { + val results = expectedPipe.insert('testName , testName) + .joinWithTiny('testName -> 'testName, realJdbcPipe.insert('testName , testName)) + .map(('expecteddata, 'jdbcdata)->'result) { x:(String,String) => + //println(x._1 + " === " + x._2) + if (x._1.equals(x._2)) + "Success" + else + "Test Failed" + } + .project('testName, 'result, 'expecteddata, 'jdbcdata) + results + } + + /** + * + */ + def getExpectedPipe ( expectedList: List[(String,String,String,String)]) : Pipe = { + + val expectedPipe = + IterableSource(expectedList, TABLE_01_SCHEMA) + .groupAll { group => + group.toList[String]('key -> 'key) + group.toList[String]('column1 -> 'column1) + group.toList[String]('column2 -> 'column2) + group.toList[String]('column3 -> 'column3) + + } + .mapTo(('*) -> 'expecteddata) { x:(String,String,String,String) => + x._1 + " " + x._2 + " " + x._3 + " " + x._4 + } + expectedPipe + } + +} |