aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala')
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala14
1 files changed, 11 insertions, 3 deletions
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index beb66be..09f4579 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -28,11 +28,11 @@ case class JDBCSource(
updateByFields: Fields = null
) extends Source {
- override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)
+ val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
- override def localScheme = new NullScheme(fields, fields)
+ def localScheme = new NullScheme(fields, fields)
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
@@ -53,7 +53,15 @@ case class JDBCSource(
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
}
- case _ => super.createTap(readOrWrite)(mode)
+ case _ => createEmptyTap(readOrWrite)(mode)
}
}
+
+ def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
+ mode match {
+ case _ => {
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
+ }
+ }
}