1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package parallelai.spyglass.jdbc.testing
import com.twitter.scalding.TextLine
import com.twitter.scalding.Args
import com.twitter.scalding.Tsv
import com.twitter.scalding.mathematics.Matrix._
import scala.math._
import scala.math.BigDecimal.javaBigDecimal2bigDecimal
import cascading.tuple.Fields
import cascading.pipe.Pipe
import com.twitter.scalding.Osv
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.jdbc.JDBCSource
class HdfsToJdbc (args: Args) extends JobBase(args) {
implicit val implicitArgs: Args = args
val scaldingInputPath = getString("input.scalding")
log.info("Scalding sample input path => [" + scaldingInputPath + "]")
val S_output = scaldingInputPath
val fileType = getString("fileType")
log.info("Input file type => " + fileType)
val S_SCHEMA = List(
'key_id, 'col1, 'col2, 'col3
)
val url = "mysql01.prod.bigdata.bskyb.com"
val dbName = "skybet_db"
val tableName = "skybet_hbase_betdetail_jdbc_test"
val jdbcSource2 = new JDBCSource(
"db_name",
"com.mysql.jdbc.Driver",
"jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
"user",
"password",
List("KEY_ID", "COL1", "COL2", "COL3"),
List( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
List("key_id"),
new Fields("key_id", "col1", "col2", "col3")
)
var piper:Pipe = null
if (fileType equals("Tsv"))
piper = Tsv(S_output, S_SCHEMA).read
else
piper = Osv(S_output, S_SCHEMA).read
val S_FLOW =
Tsv(S_output, S_SCHEMA).read
.write(jdbcSource2)
}
|