diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala new file mode 100644 index 0000000..1544f47 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala @@ -0,0 +1,57 @@ +package parallelai.spyglass.jdbc.testing + +import com.twitter.scalding.TextLine +import com.twitter.scalding.Args +import com.twitter.scalding.Tsv +import com.twitter.scalding.mathematics.Matrix._ +import scala.math._ +import scala.math.BigDecimal.javaBigDecimal2bigDecimal +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding.Osv +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.jdbc.JDBCSource + +class HdfsToJdbc (args: Args) extends JobBase(args) { + + implicit val implicitArgs: Args = args + + val scaldingInputPath = getString("input.scalding") + log.info("Scalding sample input path => [" + scaldingInputPath + "]") + + val S_output = scaldingInputPath + val fileType = getString("fileType") + log.info("Input file type => " + fileType) + + val S_SCHEMA = List( + 'key_id, 'col1, 'col2, 'col3 + ) + + val url = "mysql01.prod.bigdata.bskyb.com" + val dbName = "skybet_db" + val tableName = "skybet_hbase_betdetail_jdbc_test" + + + val jdbcSource2 = new JDBCSource( + "db_name", + "com.mysql.jdbc.Driver", + "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull", + "user", + "password", + Array[String]("KEY_ID", "COL1", "COL2", "COL3"), + Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"), + Array[String]("key_id"), + new Fields("key_id", "col1", "col2", "col3") + ) + + var piper:Pipe = null + if (fileType equals("Tsv")) + piper = Tsv(S_output, S_SCHEMA).read + else + piper = Osv(S_output, S_SCHEMA).read + + val S_FLOW = + Tsv(S_output, S_SCHEMA).read + .write(jdbcSource2) + +}
\ No newline at end of file |