aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala4
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala23
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala28
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala32
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala38
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala406
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala23
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala33
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala6
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala64
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala67
11 files changed, 375 insertions, 349 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
index 6035688..b6d5742 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -36,8 +36,8 @@ class HBasePipeWrapper (pipe: Pipe) {
}
}
}
- }
-
+ }
+
// def fromBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index 39a076e..d6795aa 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -10,15 +10,12 @@ import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
-import parallelai.spyglass.hbase.HBaseScheme;
-import parallelai.spyglass.hbase.HBaseTap;
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-import cascading.scheme.Scheme
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
import cascading.tap.Tap
import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
-import scala.compat.Platform
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
@@ -29,13 +26,13 @@ object Conversions {
implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))
}
-class HBaseSource(
+case class HBaseSource(
tableName: String = null,
quorumNames: String = "localhost",
keyFields: Fields = null,
- familyNames: Array[String] = null,
- valueFields: Array[Fields] = null,
- timestamp: Long = Platform.currentTime,
+ familyNames: List[String] = null,
+ valueFields: List[Fields] = null,
+ timestamp: Long = 0L,
sourceMode: SourceMode = SourceMode.SCAN_ALL,
startKey: String = null,
stopKey: String = null,
@@ -45,9 +42,13 @@ class HBaseSource(
prefixList: String = null
) extends Source {
- override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields)
+ override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+ // To enable local mode testing
+ val allFields = keyFields.append(valueFields.toArray)
+ override def localScheme = new NullScheme(allFields, allFields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val hBaseScheme = hdfsScheme match {
case hbase: HBaseScheme => hbase
@@ -80,7 +81,7 @@ class HBaseSource(
case Write => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
- hbt.setUseSaltInSink(useSalt);
+ hbt.setUseSaltInSink(useSalt)
hbt.asInstanceOf[Tap[_,_,_]]
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
index 1ce9072..eccd653 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
@@ -19,13 +19,13 @@ class HBaseExample(args: Args) extends JobBase(args) {
val isDebug: Boolean = args("debug").toBoolean
- if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG)
+ if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
val output = args("output")
println(output)
- val jobConf = getJobConf
+ val jobConf = getJobConf()
val quorumNames = args("quorum")
@@ -38,20 +38,20 @@ class HBaseExample(args: Args) extends JobBase(args) {
val connection = HConnectionManager.getConnection(conf)
val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
- conf.set("hbase.zookeeper.quorum", quorumNames);
+ conf.set("hbase.zookeeper.quorum", quorumNames)
val htable = new HTable(HBaseConfiguration.create(conf), tableName)
}
- val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet")
+ val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet")
val hbs2 = new HBaseSource(
"table_name",
"quorum_name:2181",
'key,
- Array("column_family"),
- Array('column_name),
+ List("column_family"),
+ List('column_name),
sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
.read
.write(Tsv(output.format("get_list")))
@@ -60,8 +60,8 @@ class HBaseExample(args: Args) extends JobBase(args) {
"table_name",
"quorum_name:2181",
'key,
- Array("column_family"),
- Array('column_name),
+ List("column_family"),
+ List('column_name),
sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
.read
.write(Tsv(output.format("scan_all")))
@@ -70,8 +70,8 @@ class HBaseExample(args: Args) extends JobBase(args) {
"table_name",
"quorum_name:2181",
'key,
- Array("column_family"),
- Array('column_name),
+ List("column_family"),
+ List('column_name),
sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
.read
.write(Tsv(output.format("scan_range_to_end")))
@@ -80,8 +80,8 @@ class HBaseExample(args: Args) extends JobBase(args) {
"table_name",
"quorum_name:2181",
'key,
- Array("column_family"),
- Array('column_name),
+ List("column_family"),
+ List('column_name),
sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
.read
.write(Tsv(output.format("scan_range_from_start")))
@@ -90,8 +90,8 @@ class HBaseExample(args: Args) extends JobBase(args) {
"table_name",
"quorum_name:2181",
'key,
- Array("column_family"),
- Array('column_name),
+ List("column_family"),
+ List('column_name),
sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
.read
.write(Tsv(output.format("scan_range_between")))
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
new file mode 100644
index 0000000..7ba2788
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
@@ -0,0 +1,32 @@
+package parallelai.spyglass.hbase.example
+
+import com.twitter.scalding.{Tsv, Args}
+import parallelai.spyglass.base.JobBase
+import org.apache.log4j.{Level, Logger}
+import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import cascading.tuple.Fields
+
+/**
+ * Simple example of HBaseSource usage
+ */
+class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val isDebug: Boolean = args("debug").toBoolean
+
+ if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
+
+ val output = args("output")
+
+ val hbs = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ new Fields("key"),
+ List("column_family"),
+ List(new Fields("column_name1", "column_name2")),
+ sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3"))
+ .read
+ .fromBytesWritable(new Fields("key", "column_name1", "column_name2"))
+ .write(Tsv(output format "get_list"))
+
+ }
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
index 2ca3f32..f774648 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala
@@ -1,23 +1,21 @@
package parallelai.spyglass.hbase.testing
import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Args
import parallelai.spyglass.hbase.HBaseSource
-import com.twitter.scalding.Tsv
import cascading.tuple.Fields
import com.twitter.scalding.TextLine
import org.apache.log4j.Logger
import org.apache.log4j.Level
import parallelai.spyglass.hbase.HBasePipeConversions
-import cascading.pipe.Pipe
class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {
val isDebug = args.getOrElse("debug", "false").toBoolean
- if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) }
+ if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }
val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
@@ -26,44 +24,44 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val quorum = args("quorum")
val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.SCAN_ALL ).read
.fromBytesWritable( TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanAllNoSalt01"))
val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read
.fromBytesWritable( TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanAllPlusSalt01"))
val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read
.fromBytesWritable(TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanRangeNoSalt01"))
val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read
.fromBytesWritable(TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanRangePlusSalt01"))
val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read
.fromBytesWritable(TABLE_SCHEMA )
.write(TextLine("saltTesting/GetListNoSalt01"))
val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read
.fromBytesWritable(TABLE_SCHEMA )
@@ -71,16 +69,16 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val hbase07 =
new HBaseSource( "_TEST.SALT.03", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix )
.read
.fromBytesWritable( TABLE_SCHEMA )
.write(TextLine("saltTesting/ScanRangePlusSalt10"))
.toBytesWritable( TABLE_SCHEMA )
.write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),
useSalt = true ))
// val hbase08 =
diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala
index 536f843..10104bf 100644
--- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala
@@ -55,20 +55,22 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
var testName01 = "Scan_Test_01_Huge_Key_Range"
println("---- Running : " + testName01)
// Get everything from HBase testing table into a Pipe
- val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00")
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase01 = new HBaseSource(tableName1, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00")
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// Calculate expected result for Test 01
var list01 = List(("2000-01-01 10:00:10", "1"),
@@ -79,60 +81,66 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
val testName02 = "Scan_Test_02_Borders_Range"
println("---- Running : " + testName02)
// Get everything from HBase testing table into a Pipe
- val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00")
- .read
- .fromBytesWritable(TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase02 = new HBaseSource(tableName1, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00")
+ .read
+ .fromBytesWritable(TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// Calculate expected result for Test 02
var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3"))
// -------------------- Test 03 --------------------
val testName03 = "Scan_Test_03_Inner_Range"
// Get everything from HBase testing table into a Pipe
- val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00")
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
-
+ val hbase03 = new HBaseSource(tableName1, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00")
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
+
// Calculate expected result for Test 03
var list03 = List(("2000-01-01 10:05:00", "2"))
// -------------------- Test 04 --------------------
val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered"
// Get everything from HBase testing table into a Pipe
- val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000")
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase04 = new HBaseSource(tableName1, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000")
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// -------------------- Test 0 - TODO scan multiple versions .. --------------------
// val testName04 = "Scan_Test_04_One_Version"
@@ -160,42 +168,46 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
// -------------------- Test 05 --------------------
val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions"
// Get everything from HBase testing table into a Pipe
- val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"),
- versions = 1 )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase05 = new HBaseSource(tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"),
+ versions = 1)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// Calculate expected result for Test 04
var list05 = List(("2000-01-01 11:00:00", "6"))
// -------------------- Test 6 --------------------
val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions"
- val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"),
- versions = 2 )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"),
+ versions = 2)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// Calculate expected result for Test 05
var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"))
@@ -203,21 +215,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
// -------------------- Test 7 --------------------
val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions"
// Get everything from HBase testing table into a Pipe
- val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"),
- versions = 3 )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"),
+ versions = 3)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// Calculate expected result for Test 07
var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4"))
@@ -225,21 +239,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
// -------------------- Test 08 --------------------
val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions"
// Get everything from HBase testing table into a Pipe
- val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"),
- versions = 4 )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase08 = new HBaseSource(tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"),
+ versions = 4)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"),
("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4"))
@@ -247,21 +263,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
// -------------------- Test 09 --------------------
val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions"
// Get everything from HBase testing table into a Pipe
- val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"),
- versions = 4 )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"),
+ versions = 4)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4"))
@@ -273,21 +291,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00")
// Get everything from HBase testing table into a Pipe
- val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = bigList,
- versions = 2 ) //
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = bigList,
+ versions = 2) //
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),
@@ -297,58 +317,64 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon
// -------------------- Test 11 --------------------
val testName11 = "Get_Test_07_EmptyList"
// Get everything from HBase testing table into a Pipe
- val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List(),
- versions = 1 ) //
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List(),
+ versions = 1) //
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// -------------------- Test 11 --------------------
val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions"
// Get everything from HBase testing table into a Pipe
- val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"),
- versions = 1 )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase12 = new HBaseSource(tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"),
+ versions = 1)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
// --------------------- TEST 13 -----------------------------
val testName13 = "Some "
- val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key,
- Array("data"),
- Array('column1),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true )
- .read
- .fromBytesWritable(
- TABLE_01_SCHEMA)
- .groupAll { group =>
- group.toList[String]('key -> 'key)
- group.toList[String]('column1 -> 'column1)
- }
- .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) =>
- x._1 + " " + x._2
- }
+ val hbase13 = new HBaseSource(tableName2, hbaseHost, 'key,
+ List("data"),
+ List('column1),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey = "", useSalt = true)
+ .read
+ .fromBytesWritable(
+ TABLE_01_SCHEMA)
+ .groupAll {
+ group =>
+ group.toList[String]('key -> 'key)
+ group.toList[String]('column1 -> 'column1)
+ }
+ .mapTo(('key, 'column1) -> 'hbasedata) {
+ x: (String, String) =>
+ x._1 + " " + x._2
+ }
var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),
("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5")
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index 2a08b7d..beb66be 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -6,31 +6,34 @@ import com.twitter.scalding.Mode
import com.twitter.scalding.Read
import com.twitter.scalding.Source
import com.twitter.scalding.Write
-import cascading.scheme.Scheme
+import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.Tap
import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
-class JDBCSource(
+case class JDBCSource(
tableName: String = "tableName",
driverName: String = "com.mysql.jdbc.Driver",
connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>",
userId: String = "user",
password: String = "password",
- columnNames: Array[String] = Array[String]("col1", "col2", "col3"),
- columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"),
- primaryKeys: Array[String] = Array[String]("primary_key"),
+ columnNames: List[String] = List("col1", "col2", "col3"),
+ columnDefs: List[String] = List("data_type", "data_type", "data_type"),
+ primaryKeys: List[String] = List("primary_key"),
fields: Fields = new Fields("fld1", "fld2", "fld3"),
- orderBy: Array[String] = null,
- updateBy: Array[String] = null,
+ orderBy: List[String] = List(),
+ updateBy: List[String] = List(),
updateByFields: Fields = null
) extends Source {
- override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy)
+ override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+ // To enable local mode testing
+ override def localScheme = new NullScheme(fields, fields)
+
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
case jdbc: JDBCScheme => jdbc
@@ -39,13 +42,13 @@ class JDBCSource(
mode match {
case hdfsMode @ Hdfs(_, _) => readOrWrite match {
case Read => {
- val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)
val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys)
+ val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)
val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala b/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala
new file mode 100644
index 0000000..0217204
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala
@@ -0,0 +1,33 @@
+package parallelai.spyglass.jdbc.example
+
+import com.twitter.scalding.{Tsv, Args}
+import parallelai.spyglass.base.JobBase
+import org.apache.log4j.{Level, Logger}
+import parallelai.spyglass.jdbc.JDBCSource
+import cascading.tuple.Fields
+
+/**
+ * Simple example of JDBCSource usage
+ */
+class JdbcSourceExample(args: Args) extends JobBase(args) {
+
+ val isDebug: Boolean = args("debug").toBoolean
+
+ if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
+
+ val output = args("output")
+
+ val hbs2 = new JDBCSource(
+ "db_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
+ "user",
+ "password",
+ List("KEY_ID", "COL1", "COL2", "COL3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("key_id"),
+ new Fields("key_id", "col1", "col2", "col3")
+ ).read
+ .write(Tsv(output.format("get_list")))
+
+}
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
index 1544f47..d290f06 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala
@@ -38,9 +38,9 @@ class HdfsToJdbc (args: Args) extends JobBase(args) {
"jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
"user",
"password",
- Array[String]("KEY_ID", "COL1", "COL2", "COL3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("key_id"),
+ List("KEY_ID", "COL1", "COL2", "COL3"),
+ List( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
+ List("key_id"),
new Fields("key_id", "col1", "col2", "col3")
)
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
index 30c03a2..765b422 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala
@@ -34,29 +34,29 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {
val tableName = "skybet_hbase_betdetail_jdbc_test"
val jdbcSourceRead = new JDBCSource(
- "TABLE_01",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
)
-
+
val jdbcSourceWrite = new JDBCSource(
- "TABLE_01",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
)
// -----------------------------
@@ -89,18 +89,18 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {
println("---- Running : " + testName02)
// Get everything from JDBC testing table into a Pipe
-
+
val jdbcSourceReadUpdated = new JDBCSource(
- "TABLE_02",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- Array[String]( "bigint(20)" , "varchar(45)" , "varchar(45)" , "bigint(20)"),
- Array[String]("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
+ "TABLE_02",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null
)
val jdbc02 = jdbcSourceReadUpdated
diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
deleted file mode 100644
index 9fc09e4..0000000
--- a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package parallelai.spyglass.jdbc.testing
-
-import com.twitter.scalding._
-import cascading.tuple.Fields
-import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.jdbc.JDBCSource
-
-/**
- * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match.
- * Now hardcoded for Skybet betdetail summation sample.
- * To run it:
- * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \
- * --app.conf.path /projects/application-hadoop.conf --hdfs \
- * --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation
- * @param args
- */
-class TablesComparison(args: Args) extends JobBase(args) {
-
- implicit val implicitArgs: Args = args
- val conf = appConfig
-
- val jdbcSink = new JDBCSource(
- "table_name",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://<hostname>:<port>/<db_name>",
- "skybet_user",
- "zkb4Uo{C8",
- Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"),
- Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"),
- Array[String]("betleg_id"),
- new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"),
- Array[String]("BETLEG_ID"),
- Array[String]("BETLEG_ID"),
- new Fields("betleg_id")
- )
- .read
- .insert('name, "betdetail")
- .project('bet_id, 'part_no, 'leg_id)
-
- //
- // .write(new TextLine("testJDBCComparator/compare1"))
-
- val jdbcSource2 = new JDBCSource(
- "skybet_midas_bet_detail",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull",
- "skybet_user",
- "zkb4Uo{C8",
- Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"),
- Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"),
- Array[String]("bet_id"),
- new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no")
-
- )
- .read
- .insert('name, "sample")
- .project('bet_id, 'part_no, 'leg_id)
-
- val uPipe = jdbcSink ++ jdbcSource2
- uPipe
- .groupBy('bet_id, 'part_no, 'leg_id) {
- _.size
- }.filter('size) {
- x: Int => x != 2
- }
- .write(new TextLine("testJDBCComparator/result"))
-} \ No newline at end of file