aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala56
1 files changed, 56 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
new file mode 100644
index 0000000..2a08b7d
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -0,0 +1,56 @@
+package parallelai.spyglass.jdbc
+
+import com.twitter.scalding.AccessMode
+import com.twitter.scalding.Hdfs
+import com.twitter.scalding.Mode
+import com.twitter.scalding.Read
+import com.twitter.scalding.Source
+import com.twitter.scalding.Write
+import cascading.scheme.Scheme
+import cascading.tap.Tap
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.RecordReader
+import org.apache.hadoop.mapred.OutputCollector
+import org.apache.hadoop.mapred.JobConf
+
+class JDBCSource(
+ tableName: String = "tableName",
+ driverName: String = "com.mysql.jdbc.Driver",
+ connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>",
+ userId: String = "user",
+ password: String = "password",
+ columnNames: Array[String] = Array[String]("col1", "col2", "col3"),
+ columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"),
+ primaryKeys: Array[String] = Array[String]("primary_key"),
+ fields: Fields = new Fields("fld1", "fld2", "fld3"),
+ orderBy: Array[String] = null,
+ updateBy: Array[String] = null,
+ updateByFields: Fields = null
+ ) extends Source {
+
+ override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val jdbcScheme = hdfsScheme match {
+ case jdbc: JDBCScheme => jdbc
+ case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
+ jdbcTap.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+
+ val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
+ jdbcTap.asInstanceOf[Tap[_,_,_]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}