aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala57
1 files changed, 57 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
new file mode 100644
index 0000000..1544f47
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
@@ -0,0 +1,57 @@
+package parallelai.spyglass.jdbc.testing
+
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.Args
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.mathematics.Matrix._
+import scala.math._
+import scala.math.BigDecimal.javaBigDecimal2bigDecimal
+import cascading.tuple.Fields
+import cascading.pipe.Pipe
+import com.twitter.scalding.Osv
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.jdbc.JDBCSource
+
+class HdfsToJdbc (args: Args) extends JobBase(args) {
+
+ implicit val implicitArgs: Args = args
+
+ val scaldingInputPath = getString("input.scalding")
+ log.info("Scalding sample input path => [" + scaldingInputPath + "]")
+
+ val S_output = scaldingInputPath
+ val fileType = getString("fileType")
+ log.info("Input file type => " + fileType)
+
+ val S_SCHEMA = List(
+ 'key_id, 'col1, 'col2, 'col3
+ )
+
+ val url = "mysql01.prod.bigdata.bskyb.com"
+ val dbName = "skybet_db"
+ val tableName = "skybet_hbase_betdetail_jdbc_test"
+
+
+ val jdbcSource2 = new JDBCSource(
+ "db_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
+ "user",
+ "password",
+ Array[String]("KEY_ID", "COL1", "COL2", "COL3"),
+ Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ Array[String]("key_id"),
+ new Fields("key_id", "col1", "col2", "col3")
+ )
+
+ var piper:Pipe = null
+ if (fileType equals("Tsv"))
+ piper = Tsv(S_output, S_SCHEMA).read
+ else
+ piper = Osv(S_output, S_SCHEMA).read
+
+ val S_FLOW =
+ Tsv(S_output, S_SCHEMA).read
+ .write(jdbcSource2)
+
+} \ No newline at end of file