diff options
Diffstat (limited to 'src/main/scala/parallelai')
11 files changed, 375 insertions, 349 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index 6035688..b6d5742 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -36,8 +36,8 @@ class HBasePipeWrapper (pipe: Pipe) {  	      }  	    }  	  } -	}	 -	 +	} +  //	def fromBytesWritable : Pipe = {  //	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>  //	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index 39a076e..d6795aa 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -10,15 +10,12 @@ import com.twitter.scalding.Read  import com.twitter.scalding.Source  import com.twitter.scalding.Write -import parallelai.spyglass.hbase.HBaseScheme; -import parallelai.spyglass.hbase.HBaseTap; -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; -import cascading.scheme.Scheme +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import cascading.scheme.{NullScheme, Scheme}  import cascading.tap.SinkMode  import cascading.tap.Tap  import cascading.tuple.Fields  import org.apache.hadoop.mapred.RecordReader -import scala.compat.Platform  import org.apache.hadoop.mapred.OutputCollector  import org.apache.hadoop.mapred.JobConf @@ -29,13 +26,13 @@ object Conversions {    implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))  } -class HBaseSource( +case class HBaseSource(      tableName: String = null,      quorumNames: String = "localhost",      keyFields: Fields = null, -    familyNames: Array[String] = null, -    valueFields: Array[Fields] = null, -    timestamp: Long = Platform.currentTime, +    familyNames: List[String] = null, +    valueFields: List[Fields] = null, +    timestamp: Long = 0L,      sourceMode: SourceMode = SourceMode.SCAN_ALL,      startKey: String = null,      stopKey: String = null, @@ -45,9 +42,13 @@ class HBaseSource(      prefixList: String = null    ) extends Source { -  override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) +  override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)      .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +  // To enable local mode testing +  val allFields = keyFields.append(valueFields.toArray) +  override def localScheme = new NullScheme(allFields, allFields) +    override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {      val hBaseScheme = hdfsScheme match {        case hbase: HBaseScheme => hbase @@ -80,7 +81,7 @@ class HBaseSource(          case Write => {            val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) -          hbt.setUseSaltInSink(useSalt); +          hbt.setUseSaltInSink(useSalt)            hbt.asInstanceOf[Tap[_,_,_]]          } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index 1ce9072..eccd653 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -19,13 +19,13 @@ class HBaseExample(args: Args) extends JobBase(args) {    val isDebug: Boolean = args("debug").toBoolean -  if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) +  if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)    val output = args("output")    println(output) -  val jobConf = getJobConf +  val jobConf = getJobConf()    val quorumNames = args("quorum") @@ -38,20 +38,20 @@ class HBaseExample(args: Args) extends JobBase(args) {      val connection = HConnectionManager.getConnection(conf)      val maxThreads = conf.getInt("hbase.htable.threads.max", 1) -    conf.set("hbase.zookeeper.quorum", quorumNames); +    conf.set("hbase.zookeeper.quorum", quorumNames)      val htable = new HTable(HBaseConfiguration.create(conf), tableName)    } -  val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") +  val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet")    val hbs2 = new HBaseSource(      "table_name",      "quorum_name:2181",      'key, -    Array("column_family"), -    Array('column_name), +    List("column_family"), +    List('column_name),      sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))      .read      .write(Tsv(output.format("get_list"))) @@ -60,8 +60,8 @@ class HBaseExample(args: Args) extends JobBase(args) {      "table_name",      "quorum_name:2181",      'key, -    Array("column_family"), -    Array('column_name), +    List("column_family"), +    List('column_name),      sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")      .read      .write(Tsv(output.format("scan_all"))) @@ -70,8 +70,8 @@ class HBaseExample(args: Args) extends JobBase(args) {      "table_name",      "quorum_name:2181",      'key, -    Array("column_family"), -    Array('column_name), +    List("column_family"), +    List('column_name),      sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")      .read      .write(Tsv(output.format("scan_range_to_end"))) @@ -80,8 +80,8 @@ class HBaseExample(args: Args) extends JobBase(args) {      "table_name",      "quorum_name:2181",      'key, -    Array("column_family"), -    Array('column_name), +    List("column_family"), +    List('column_name),      sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")      .read      .write(Tsv(output.format("scan_range_from_start"))) @@ -90,8 +90,8 @@ class HBaseExample(args: Args) extends JobBase(args) {      "table_name",      "quorum_name:2181",      'key, -    Array("column_family"), -    Array('column_name), +    List("column_family"), +    List('column_name),      sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")      .read      .write(Tsv(output.format("scan_range_between"))) diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala new file mode 100644 index 0000000..7ba2788 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -0,0 +1,32 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.{Tsv, Args} +import parallelai.spyglass.base.JobBase +import org.apache.log4j.{Level, Logger} +import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import cascading.tuple.Fields + +/** +  * Simple example of HBaseSource usage +  */ +class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeConversions { + +   val isDebug: Boolean = args("debug").toBoolean + +   if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + +   val output = args("output") + +   val hbs = new HBaseSource( +     "table_name", +     "quorum_name:2181", +     new Fields("key"), +     List("column_family"), +     List(new Fields("column_name1", "column_name2")), +     sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")) +     .read +     .fromBytesWritable(new Fields("key", "column_name1", "column_name2")) +     .write(Tsv(output format "get_list")) + + } diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index 2ca3f32..f774648 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -1,23 +1,21 @@  package parallelai.spyglass.hbase.testing  import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import com.twitter.scalding.Args  import parallelai.spyglass.hbase.HBaseSource -import com.twitter.scalding.Tsv  import cascading.tuple.Fields  import com.twitter.scalding.TextLine  import org.apache.log4j.Logger  import org.apache.log4j.Level  import parallelai.spyglass.hbase.HBasePipeConversions -import cascading.pipe.Pipe  class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversions {    val isDebug = args.getOrElse("debug", "false").toBoolean -  if( isDebug ) { Logger.getRootLogger().setLevel(Level.DEBUG) }  +  if( isDebug ) { Logger.getRootLogger.setLevel(Level.DEBUG) }    val TABLE_SCHEMA = List('key, 'salted, 'unsalted) @@ -26,44 +24,44 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio    val quorum = args("quorum")    val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.SCAN_ALL ).read            .fromBytesWritable( TABLE_SCHEMA )    .write(TextLine("saltTesting/ScanAllNoSalt01"))    val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read                 .fromBytesWritable( TABLE_SCHEMA )    .write(TextLine("saltTesting/ScanAllPlusSalt01"))    val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read               .fromBytesWritable(TABLE_SCHEMA )    .write(TextLine("saltTesting/ScanRangeNoSalt01"))    val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read                 .fromBytesWritable(TABLE_SCHEMA )    .write(TextLine("saltTesting/ScanRangePlusSalt01"))    val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read            .fromBytesWritable(TABLE_SCHEMA )    .write(TextLine("saltTesting/GetListNoSalt01"))    val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read            .fromBytesWritable(TABLE_SCHEMA ) @@ -71,16 +69,16 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio    val hbase07 =         new HBaseSource( "_TEST.SALT.03", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix )    .read    .fromBytesWritable( TABLE_SCHEMA )    .write(TextLine("saltTesting/ScanRangePlusSalt10"))    .toBytesWritable( TABLE_SCHEMA )    .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key,   -          TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,  -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, +          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)),            useSalt = true ))  //  val hbase08 =  diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala index 536f843..10104bf 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala @@ -55,20 +55,22 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    var testName01 = "Scan_Test_01_Huge_Key_Range"    println("---- Running : " + testName01)    // Get everything from HBase testing table into a Pipe -  val hbase01 = new HBaseSource( tableName1, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") -  		  	.read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase01 = new HBaseSource(tableName1, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 00:00:00", stopKey = "2000-01-02 00:00:00") +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // Calculate expected result for Test 01    var list01 = List(("2000-01-01 10:00:10", "1"), @@ -79,60 +81,66 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    val testName02 = "Scan_Test_02_Borders_Range"    println("---- Running : " + testName02)    // Get everything from HBase testing table into a Pipe -  val hbase02 = new HBaseSource( tableName1, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") -  		  	.read -  		  	.fromBytesWritable(TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase02 = new HBaseSource(tableName1, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:10", stopKey = "2000-01-01 10:10:00") +    .read +    .fromBytesWritable(TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // Calculate expected result for Test 02    var list02 = List(("2000-01-01 10:00:10", "1"), ("2000-01-01 10:05:00", "2"), ("2000-01-01 10:10:00", "3"))    // -------------------- Test 03 --------------------    val testName03 = "Scan_Test_03_Inner_Range"    // Get everything from HBase testing table into a Pipe -  val hbase03 = new HBaseSource( tableName1, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") -  		  	.read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } -   +  val hbase03 = new HBaseSource(tableName1, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "2000-01-01 10:00:55", stopKey = "2000-01-01 10:07:00") +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    } +    // Calculate expected result for Test 03    var list03 = List(("2000-01-01 10:05:00", "2"))    // -------------------- Test 04 --------------------    val testName04 = "Scan_Test_04_Out_Of_Range_And_Unordered"    // Get everything from HBase testing table into a Pipe -  val hbase04 = new HBaseSource( tableName1, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") -  		  	.read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase04 = new HBaseSource(tableName1, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "9", stopKey = "911000000") +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // -------------------- Test 0 - TODO scan multiple versions .. --------------------  //  val testName04 = "Scan_Test_04_One_Version" @@ -160,42 +168,46 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    // -------------------- Test 05 --------------------    val testName05 = "Get_Test_01_One_Existing_Some_Nonexisting_Keys_1_Versions"    // Get everything from HBase testing table into a Pipe -  val hbase05 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -            sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), -            versions = 1 ) -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase05 = new HBaseSource(tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "2000-01-01 11:00:00", "5004897"), +    versions = 1) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // Calculate expected result for Test 04    var list05 = List(("2000-01-01 11:00:00", "6"))    // -------------------- Test 6 --------------------    val testName06 = "Get_Test_02_One_Existing_Some_Nonexisting_Keys_2_Versions" -  val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -            sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), -            versions = 2 ) -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group => -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase06 = new HBaseSource( tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List("a", "5003914", "2000-01-01 10:00:00"), +    versions = 2) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // Calculate expected result for Test 05    var list06 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2")) @@ -203,21 +215,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    // -------------------- Test 7 --------------------    val testName07 = "Get_Test_03_One_Existing_Some_Nonexisting_Keys_3_Versions"    // Get everything from HBase testing table into a Pipe -  val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), -            versions = 3 ) -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase07 = new HBaseSource( tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01", "2000-01-01 11:00:00", "zz"), +    versions = 3) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // Calculate expected result for Test 07    var list07 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) @@ -225,21 +239,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    // -------------------- Test 08 --------------------    val testName08 = "Get_Test_04_One_Existing_Some_Nonexisting_Keys_4_Versions"    // Get everything from HBase testing table into a Pipe -  val hbase08 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), -            versions = 4 ) -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase08 = new HBaseSource(tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "2000-01-01 10:00:00", "zz"), +    versions = 4) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    var list08 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),("2000-01-01 10:00:00","1"),                      ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) @@ -247,21 +263,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    // -------------------- Test 09 --------------------    val testName09 = "Get_Test_05_Get_Same_Key_Multiple_Times_4_versions"    // Get everything from HBase testing table into a Pipe -  val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -            sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), -            versions = 4 ) -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase09 = new HBaseSource( tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List("2000", "2000-01-01 11:00:00", "avdvf", "2000-01-01 11:00:00"), +    versions = 4) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    var list09 = List(("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5"),("2000-01-01 11:00:00","4")) @@ -273,21 +291,23 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    var bigList = ((bigList1 ::: List("2000-01-01 11:00:00")) ::: bigList2) ::: List("2000-01-01 10:00:00")       // Get everything from HBase testing table into a Pipe -  val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.GET_LIST, keyList = bigList, -            versions = 2 ) //  -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase10 = new HBaseSource( tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = bigList, +    versions = 2) // +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +    group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    var list10 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"), @@ -297,58 +317,64 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon    // -------------------- Test 11 --------------------    val testName11 = "Get_Test_07_EmptyList"    // Get everything from HBase testing table into a Pipe -  val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.GET_LIST, keyList = List(), -            versions = 1 ) //  -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase11 = new HBaseSource( tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List(), +    versions = 1) // +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // -------------------- Test 11 --------------------    val testName12 = "Get_Test_08_Three_Nonexistingkeys_1_Versions"    // Get everything from HBase testing table into a Pipe -  val hbase12 = new HBaseSource( tableName2, hbaseHost, 'key,   -  		    Array("data"),  -  		    Array('column1), -  		    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), -            versions = 1 )   -            .read  	     -  	    .fromBytesWritable( -  		TABLE_01_SCHEMA) -  .groupAll { group =>  -  	   group.toList[String]('key -> 'key) -  	   group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -	     x._1 + " " + x._2 -  } +  val hbase12 = new HBaseSource(tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"), +    versions = 1) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    // --------------------- TEST 13 -----------------------------    val testName13 = "Some " -  val hbase13 = new HBaseSource( tableName2, hbaseHost, 'key,   -          Array("data"),  -          Array('column1), -          sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey="", useSalt = true )   -            .read        -        .fromBytesWritable( -      TABLE_01_SCHEMA) -  .groupAll { group =>  -       group.toList[String]('key -> 'key) -       group.toList[String]('column1 -> 'column1) -  } -  .mapTo(('key, 'column1) -> 'hbasedata) { x:(String,String) => -       x._1 + " " + x._2 -  }  +  val hbase13 = new HBaseSource(tableName2, hbaseHost, 'key, +    List("data"), +    List('column1), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "", stopKey = "", useSalt = true) +    .read +    .fromBytesWritable( +    TABLE_01_SCHEMA) +    .groupAll { +      group => +        group.toList[String]('key -> 'key) +        group.toList[String]('column1 -> 'column1) +    } +    .mapTo(('key, 'column1) -> 'hbasedata) { +      x: (String, String) => +        x._1 + " " + x._2 +    }    var list13 = List(("2000-01-01 10:00:00", "3"),("2000-01-01 10:00:00","2"),              ("2000-01-01 11:00:00", "6"),("2000-01-01 11:00:00","5") diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 2a08b7d..beb66be 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -6,31 +6,34 @@ import com.twitter.scalding.Mode  import com.twitter.scalding.Read  import com.twitter.scalding.Source  import com.twitter.scalding.Write -import cascading.scheme.Scheme +import cascading.scheme.{NullScheme, Scheme}  import cascading.tap.Tap  import cascading.tuple.Fields  import org.apache.hadoop.mapred.RecordReader  import org.apache.hadoop.mapred.OutputCollector  import org.apache.hadoop.mapred.JobConf -class JDBCSource( +case class JDBCSource(      tableName: String = "tableName",      driverName: String = "com.mysql.jdbc.Driver",      connectionString: String = "jdbc:mysql://<hostname>:<port>/<db_name>",      userId: String = "user",      password: String = "password", -    columnNames: Array[String] = Array[String]("col1", "col2", "col3"), -    columnDefs: Array[String] = Array[String]("data_type", "data_type", "data_type"), -    primaryKeys: Array[String] = Array[String]("primary_key"), +    columnNames: List[String] = List("col1", "col2", "col3"), +    columnDefs: List[String] = List("data_type", "data_type", "data_type"), +    primaryKeys: List[String] = List("primary_key"),      fields: Fields = new Fields("fld1", "fld2", "fld3"), -    orderBy: Array[String] = null, -    updateBy: Array[String] = null, +    orderBy: List[String] = List(), +    updateBy: List[String] = List(),      updateByFields: Fields = null    ) extends Source { -  override val hdfsScheme = new JDBCScheme(fields, columnNames, orderBy, updateByFields, updateBy) +  override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)      .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +  // To enable local mode testing +  override def localScheme = new NullScheme(fields, fields) +    override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {      val jdbcScheme = hdfsScheme match {        case jdbc: JDBCScheme => jdbc @@ -39,13 +42,13 @@ class JDBCSource(      mode match {        case hdfsMode @ Hdfs(_, _) => readOrWrite match {          case Read => { -          val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) +          val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)            val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)            jdbcTap.asInstanceOf[Tap[_,_,_]]          }          case Write => { -          val tableDesc = new TableDesc(tableName, columnNames, columnDefs, primaryKeys) +          val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)            val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)            jdbcTap.asInstanceOf[Tap[_,_,_]]          } diff --git a/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala b/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala new file mode 100644 index 0000000..0217204 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/jdbc/example/JdbcSourceExample.scala @@ -0,0 +1,33 @@ +package parallelai.spyglass.jdbc.example + +import com.twitter.scalding.{Tsv, Args} +import parallelai.spyglass.base.JobBase +import org.apache.log4j.{Level, Logger} +import parallelai.spyglass.jdbc.JDBCSource +import cascading.tuple.Fields + +/** + * Simple example of JDBCSource usage + */ +class JdbcSourceExample(args: Args) extends JobBase(args) { + +  val isDebug: Boolean = args("debug").toBoolean + +  if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + +  val output = args("output") + +  val hbs2 = new JDBCSource( +    "db_name", +    "com.mysql.jdbc.Driver", +    "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull", +    "user", +    "password", +    List("KEY_ID", "COL1", "COL2", "COL3"), +    List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +    List("key_id"), +    new Fields("key_id", "col1", "col2", "col3") +  ).read +    .write(Tsv(output.format("get_list"))) + +} diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala index 1544f47..d290f06 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/HdfsToJdbc.scala @@ -38,9 +38,9 @@ class HdfsToJdbc (args: Args) extends JobBase(args) {  		"jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",  		"user",  		"password", -		Array[String]("KEY_ID", "COL1", "COL2", "COL3"), -		Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,  "bigint(20)"), -		Array[String]("key_id"), +    List("KEY_ID", "COL1", "COL2", "COL3"), +    List( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,  "bigint(20)"), +    List("key_id"),  		new Fields("key_id",    "col1",    "col2",    "col3")  	) diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala index 30c03a2..765b422 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/testing/JdbcSourceShouldReadWrite.scala @@ -34,29 +34,29 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {    val tableName = "skybet_hbase_betdetail_jdbc_test"    val jdbcSourceRead = new JDBCSource( -	"TABLE_01", -	"com.mysql.jdbc.Driver", -	"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", -	"root", -	"password", -	Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), -	Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,    "bigint(20)"), -	Array[String]("id"), -	new Fields("key",    "column1",    "column2",    "column3"), -	null, null, null +    "TABLE_01", +    "com.mysql.jdbc.Driver", +    "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +    "root", +    "password", +    List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +    List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +    List("id"), +    new Fields("key", "column1", "column2", "column3"), +    null, null, null    ) -   +    val jdbcSourceWrite = new JDBCSource( -	"TABLE_01", -	"com.mysql.jdbc.Driver", -	"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", -	"root", -	"password", -	Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), -	Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,    "bigint(20)"), -	Array[String]("id"), -	new Fields("key",    "column1",    "column2",    "column3"), -	null, null, null +    "TABLE_01", +    "com.mysql.jdbc.Driver", +    "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +    "root", +    "password", +    List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +    List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +    List("id"), +    new Fields("key", "column1", "column2", "column3"), +    null, null, null    )      // ----------------------------- @@ -89,18 +89,18 @@ class JdbcSourceShouldReadWrite (args: Args) extends JobBase(args) {    println("---- Running : " + testName02)    // Get everything from JDBC testing table into a Pipe -   +    val jdbcSourceReadUpdated = new JDBCSource( -	"TABLE_02", -	"com.mysql.jdbc.Driver", -	"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", -	"root", -	"password", -	Array[String]("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), -	Array[String]( "bigint(20)" ,  "varchar(45)"  ,  "varchar(45)"  ,    "bigint(20)"), -	Array[String]("id"), -	new Fields("key",    "column1",    "column2",    "column3"), -	null, null, null +    "TABLE_02", +    "com.mysql.jdbc.Driver", +    "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +    "root", +    "password", +    List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +    List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +    List("id"), +    new Fields("key", "column1", "column2", "column3"), +    null, null, null    )      val jdbc02 = jdbcSourceReadUpdated diff --git a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala b/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala deleted file mode 100644 index 9fc09e4..0000000 --- a/src/main/scala/parallelai/spyglass/jdbc/testing/TablesComparison.scala +++ /dev/null @@ -1,67 +0,0 @@ -package parallelai.spyglass.jdbc.testing - -import com.twitter.scalding._ -import cascading.tuple.Fields -import parallelai.spyglass.base.JobBase -import parallelai.spyglass.jdbc.JDBCSource - -/** - * Compares whether two tables have the same data or not writing to HDFS the ids of the records that don't match. - * Now hardcoded for Skybet betdetail summation sample. - * To run it: - * bskyb.commons.scalding.base.JobRunner bskyb.commons.skybase.jdbc.testing.TablesComparison \ - *    --app.conf.path /projects/application-hadoop.conf --hdfs  \ - *    --job.lib.path file:///home/gfe01/IdeaProjects/commons/commons.hbase.skybase/alternateLocation - * @param args - */ -class TablesComparison(args: Args) extends JobBase(args) { - -  implicit val implicitArgs: Args = args -  val conf = appConfig - -  val jdbcSink = new JDBCSource( -    "table_name", -    "com.mysql.jdbc.Driver", -    "jdbc:mysql://<hostname>:<port>/<db_name>", -    "skybet_user", -    "zkb4Uo{C8", -    Array[String]("BETLEG_ID", "CR_DATE", "EV_MKT_ID", "CUST_ID", "SET_DATETIME", "BET_DATETIME", "STATUS", "SOURCE", "BET_TYPE", "AFF_NAME", "CURRENCY_CODE", "BET_ID", "LEG_ID", "RECEIPT_NO", "STAKE", "REFUND", "WINNINGS", "PROFIT", "STAKE_GBP", "REFUND_GBP", "WINNINGS_GBP", "PROFIT_GBP", "NUM_SELS", "EXCH_RATE", "ACCT_NO", "BET_IP_ADDRESS", "NUM_DRAWS", "EXTRACT_DATE", "BET_TIME", "BET_DATE_TIME", "SET_DATE_TIME", "BET_DATE_MONTH", "SET_TIME_KEY", "BET_TIME_KEY", "SET_TIME", "SET_DATE", "BET_DATE", "PART_NO", "MARKET_SORT", "TAG", "PLACED_IN_RUNNING", "MAX_STAKE_SCALE", "ODDS_NUM", "ODDS_DEN", "EV_OC_ID", "USER_CLIENT_ID"), -    Array[String]("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)", "varchar(45)", "varchar(45)", "char(1)", "varchar(45)", "char(5)", "varchar(45)", "char(3)", "bigint(20)", "bigint(20)", "varchar(24)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "decimal(12,2)", "smallint(6)", "decimal(12,2)", "varchar(45)", "char(15)", "int(11)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "varchar(45)", "int(11)", "varchar(45)", "varchar(45)", "char(1)", "double(10,5)", "int(11)", "int(11)", "bigint(20)", "varchar(45)"), -    Array[String]("betleg_id"), -    new Fields("betleg_id", "cr_date", "ev_mkt_id", "cust_id", "set_datetime", "bet_datetime", "status", "source", "bet_type", "aff_name", "currency_code", "bet_id", "leg_id", "receipt_no", "stake", "refund", "winnings", "profit", "stake_gbp", "refund_gbp", "winnings_gbp", "profit_gbp", "num_sels", "exch_rate", "acct_no", "bet_ip_address", "num_draws", "extract_date", "bet_time", "bet_date_time", "set_date_time", "bet_date_month", "set_time_key", "bet_time_key", "set_time", "set_date", "bet_date", "part_no", "market_sort", "tag", "placed_in_running", "max_stake_scale", "odds_num", "odds_den", "ev_oc_id", "user_client_id"), -    Array[String]("BETLEG_ID"), -    Array[String]("BETLEG_ID"), -    new Fields("betleg_id") -  ) -    .read -    .insert('name, "betdetail") -    .project('bet_id, 'part_no, 'leg_id) - -  // -  // .write(new TextLine("testJDBCComparator/compare1")) - -  val jdbcSource2 = new JDBCSource( -    "skybet_midas_bet_detail", -    "com.mysql.jdbc.Driver", -    "jdbc:mysql://mysql01.prod.bigdata.bskyb.com:3306/skybet_db?zeroDateTimeBehavior=convertToNull", -    "skybet_user", -    "zkb4Uo{C8", -    Array[String]("BET_ID", "BET_TYPE_ID", "RECEIPT_NO", "NUM_SELS", "NUM_LINES", "BET_CHANNEL_CODE", "MOBILE_CLIENT_ID", "BET_AFFILIATE_ID", "BET_IP_ADDRESS", "LEG_ID", "LEG_TYPE", "OUTCOME_ID", "ACCT_ID", "BET_PLACED_DATETIME", "BET_PLACED_DATE", "BET_PLACED_TIME", "BET_SETTLED_DATETIME", "BET_SETTLED_DATE", "BET_SETTLED_TIME", "BET_STATUS", "STAKE", "REFUND", "'RETURN'", "PROFIT", "CURRENCY_TYPE_KEY", "EXCH_RATE", "STAKE_GBP", "REFUNDS_GBP", "RETURN_GBP", "PROFIT_GBP", "MARKET_TAG", "MARKET_SORT", "PLACED_IN_RUNNING", "ODDS_NUM", "ODDS_DEN", "BETLEG_ID", "PART_NO"), -    Array[String]("bigint(20)", "varchar(16)", "varchar(32)", "int(10)", "int(10)", "char(1)", "varchar(32)", "varchar(32)", "varchar(15)", "int(11)", "char(1)", "bigint(20)", "bigint(20)", "datetime", "date", "time", "datetime", "date", "time", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "decimal(8,3)", "varchar(32)", "varchar(32)", "varchar(32)", "int(11)", "int(11)"), -    Array[String]("bet_id"), -    new Fields("bet_id", "bet_type_id", "receipt_no", "num_sels", "num_lines", "bet_channel_code", "mobile_client_id", "bet_affiliate_id", "bet_ip_address", "leg_id", "leg_type", "outcome_id", "acct_id", "bet_placed_datetime", "bet_placed_date", "bet_placed_time", "bet_settled_datetime", "bet_settled_date", "bet_settled_time", "bet_status", "stake", "refund", "return", "profit", "currency_type_key", "exch_rate", "stake_gbp", "refunds_gbp", "return_gbp", "profit_gbp", "market_tag", "market_sort", "placed_in_running", "odds_num", "odds_den", "betleg_id", "part_no") - -  ) -    .read -    .insert('name, "sample") -    .project('bet_id, 'part_no, 'leg_id) - -  val uPipe = jdbcSink ++ jdbcSource2 -  uPipe -    .groupBy('bet_id, 'part_no, 'leg_id) { -    _.size -  }.filter('size) { -    x: Int => x != 2 -  } -    .write(new TextLine("testJDBCComparator/result")) -}
\ No newline at end of file | 
