From 20a18b4388f0cd06bec0b43d083150f6e1bb2c5e Mon Sep 17 00:00:00 2001 From: Gracia Fernandez Date: Thu, 4 Jul 2013 16:49:09 +0100 Subject: Changed HBaseSource and JDBCSource to allow testing with JobTest. Samples of tests included. --- .../parallelai/spyglass/jdbc/JDBCSource.scala | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala') diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 2a08b7d..beb66be 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -6,31 +6,34 @@ import com.twitter.scalding.Mode import com.twitter.scalding.Read import com.twitter.scalding.Source import com.twitter.scalding.Write -import cascading.scheme.Scheme +import cascading.scheme.{NullScheme, Scheme} import cascading.tap.Tap import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf -class JDBCSource( +case class JDBCSource( tableName: String = "tableName", driverName: String = "com.mysql.jdbc.Driver", connectionString: String = "jdbc:mysql://:/", userId: String = "user", password: String = "password", - columnNames: Array[String] = Array[String]("col1", "col2", "col3"), - columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"), - primaryKeys: Array[String] = Array[String]("primary_key"), + columnNames: List[String] = List("col1", "col2", "col3"), + columnDefs: List[String] = List("data_type", "data_type", "data_type"), + primaryKeys: List[String] = List("primary_key"), fields: Fields = new Fields("fld1", "fld2", "fld3"), - orderBy: Array[String] = null, - updateBy: Array[String] = null, + orderBy: List[String] = List(), + updateBy: List[String] = List(), updateByFields: Fields = null ) extends Source { - override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy) + override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray) .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + // To enable local mode testing + override def localScheme = new NullScheme(fields, fields) + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { case jdbc: JDBCScheme => jdbc @@ -39,13 +42,13 @@ class JDBCSource( mode match { case hdfsMode @ Hdfs(_, _) => readOrWrite match { case Read => { - val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) jdbcTap.asInstanceOf[Tap[_,_,_]] } case Write => { - val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) + val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) jdbcTap.asInstanceOf[Tap[_,_,_]] } -- cgit v1.2.3