aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala23
1 files changed, 13 insertions, 10 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index 2a08b7d..beb66be 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -6,31 +6,34 @@ import com.twitter.scalding.Mode
import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
-import cascading.scheme.Scheme
+import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.Tap
import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
-class JDBCSource(
+case class JDBCSource(
tableName: String = "tableName",
driverName: String = "com.mysql.jdbc.Driver",
connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>",
userId: String = "user",
password: String = "password",
- columnNames: Array[String] = Array[String]("col1", "col2", "col3"),
- columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"),
- primaryKeys: Array[String] = Array[String]("primary_key"),
+ columnNames: List[String] = List("col1", "col2", "col3"),
+ columnDefs: List[String] = List("data_type", "data_type", "data_type"),
+ primaryKeys: List[String] = List("primary_key"),
fields: Fields = new Fields("fld1", "fld2", "fld3"),
- orderBy: Array[String] = null,
- updateBy: Array[String] = null,
+ orderBy: List[String] = List(),
+ updateBy: List[String] = List(),
updateByFields: Fields = null
) extends Source {
- override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy)
+ override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+ // To enable local mode testing
+ override def localScheme = new NullScheme(fields, fields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
case jdbc: JDBCScheme => jdbc
@@ -39,13 +42,13 @@ class JDBCSource(
mode match {
case hdfsMode @ Hdfs(_, _) => readOrWrite match {
case Read => {
- val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)
val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)
val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
jdbcTap.asInstanceOf[Tap[_,_,_]]
}