diff options
author | Gracia Fernandez <Gracia.FernandezLopez@bskyb.com> | 2013-07-04 16:49:09 +0100 |
---|---|---|
committer | Gracia Fernandez <Gracia.FernandezLopez@bskyb.com> | 2013-07-04 16:49:09 +0100 |
commit | 20a18b4388f0cd06bec0b43d083150f6e1bb2c5e (patch) | |
tree | 97c532e6e07abf4c6d0312749662080b315163f6 /src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala | |
parent | e8ba249d5ce2ec293a4d19b54fc8298d4eac0271 (diff) | |
download | SpyGlass-20a18b4388f0cd06bec0b43d083150f6e1bb2c5e.tar.gz SpyGlass-20a18b4388f0cd06bec0b43d083150f6e1bb2c5e.zip |
Changed HBaseSource and JDBCSource to allow testing with JobTest. Samples of tests included.
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala')
-rw-r--r-- | src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 2a08b7d..beb66be 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -6,31 +6,34 @@ import com.twitter.scalding.Mode import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write -import cascading.scheme.Scheme +import cascading.scheme.{NullScheme, Scheme} import cascading.tap.Tap import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf -class JDBCSource( +case class JDBCSource( tableName: String = "tableName", driverName: String = "com.mysql.jdbc.Driver", connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>", userId: String = "user", password: String = "password", - columnNames: Array[String] = Array[String]("col1", "col2", "col3"), - columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"), - primaryKeys: Array[String] = Array[String]("primary_key"), + columnNames: List[String] = List("col1", "col2", "col3"), + columnDefs: List[String] = List("data_type", "data_type", "data_type"), + primaryKeys: List[String] = List("primary_key"), fields: Fields = new Fields("fld1", "fld2", "fld3"), - orderBy: Array[String] = null, - updateBy: Array[String] = null, + orderBy: List[String] = List(), + updateBy: List[String] = List(), updateByFields: Fields = null ) extends Source { - override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy) + override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray) .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + // To enable local mode testing + override def localScheme = new NullScheme(fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -39,13 +42,13 @@ class JDBCSource( mode match { case hdfsMode @ Hdfs(_, _) => readOrWrite match { case Read => { - val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) jdbcTap.asInstanceOf[Tap[_,_,_]] } case Write => { - val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) jdbcTap.asInstanceOf[Tap[_,_,_]] } |