diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
5 files changed, 77 insertions, 59 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala index d6b762e..890d2be 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -25,10 +25,14 @@ object HBaseExampleRunner extends App { } case _ => "" } + + val quorum = System.getenv("BIGDATA_QUORUM_NAMES") + assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) + println( "Quorum is [%s]".format(quorum) ) val output = "HBaseTest.%s.tsv" Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, - "--output", output, "--debug", "true", "--job.lib.path", jobLibPath )) + "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum )) }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala index d24f785..2ca3f32 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTester.scala @@ -22,61 +22,55 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val TABLE_SCHEMA = List('key, 'salted, 'unsalted) val prefix = "0123456789" + + val quorum = args("quorum") -// val hbase01 = CommonFunctors.fromBytesWritable( -// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.SCAN_ALL ).read, -// TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanAllNoSalt01")) + val hbase01 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_ALL ).read + .fromBytesWritable( TABLE_SCHEMA ) + .write(TextLine("saltTesting/ScanAllNoSalt01")) -// val hbase02 = CommonFunctors.fromBytesWritable( -// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read, -// TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanAllPlusSalt01")) + val hbase02 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_ALL, useSalt = true ).read + .fromBytesWritable( TABLE_SCHEMA ) + .write(TextLine("saltTesting/ScanAllPlusSalt01")) -// val hbase03 = CommonFunctors.fromBytesWritable( -// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read, -// TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangeNoSalt01")) + val hbase03 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "8_1728", stopKey = "1_1831" ).read + .fromBytesWritable(TABLE_SCHEMA ) + .write(TextLine("saltTesting/ScanRangeNoSalt01")) -// val hbase04 = CommonFunctors.fromBytesWritable( -// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read, -// TABLE_SCHEMA ) -// .write(TextLine("saltTesting/ScanRangePlusSalt01")) + val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "1728", stopKey = "1831", useSalt = true ).read + .fromBytesWritable(TABLE_SCHEMA ) + .write(TextLine("saltTesting/ScanRangePlusSalt01")) -// val hbase05bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read -// -// val hbase05 = CommonFunctors.fromBytesWritable( -// hbase05bytes, -// TABLE_SCHEMA ) -// .write(TextLine("saltTesting/GetListNoSalt01")) -// -// val hbase06bytes = new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, -// TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, -// TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, -// sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read -// -// val hbase06 = CommonFunctors.fromBytesWritable( -// hbase06bytes, -// TABLE_SCHEMA ) -// .write(TextLine("saltTesting/GetListPlusSalt01")) + val hbase05bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.GET_LIST, keyList = List("1_1681", "6_1456") ).read + + .fromBytesWritable(TABLE_SCHEMA ) + .write(TextLine("saltTesting/GetListNoSalt01")) + + val hbase06bytes = new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.GET_LIST, keyList = List("1681", "1456"), useSalt = true).read + + .fromBytesWritable(TABLE_SCHEMA ) + .write(TextLine("saltTesting/GetListPlusSalt01")) val hbase07 = - new HBaseSource( "_TEST.SALT.03", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, + new HBaseSource( "_TEST.SALT.03", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", useSalt = true, prefixList = prefix ) @@ -84,13 +78,13 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio .fromBytesWritable( TABLE_SCHEMA ) .write(TextLine("saltTesting/ScanRangePlusSalt10")) .toBytesWritable( TABLE_SCHEMA ) - .write(new HBaseSource( "_TEST.SALT.04", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, + .write(new HBaseSource( "_TEST.SALT.04", quorum, 'key, TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, useSalt = true )) // val hbase08 = -// new HBaseSource( "_TEST.SALT.01", "cldmgr.prod.bigdata.bskyb.com,cldnode01.prod.bigdata.bskyb.com,cldnode02.prod.bigdata.bskyb.com:2181", 'key, +// new HBaseSource( "_TEST.SALT.01", quorum, 'key, // TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, // TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, // sourceMode = SourceMode.SCAN_RANGE, startKey = "1445", stopKey = "1455", useSalt = true, prefixList = prefix ) diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala index af7d7d2..e6744b7 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSaltTesterRunner.scala @@ -6,13 +6,24 @@ object HBaseSaltTesterRunner extends App { // if( args.length < 2 ) { throw new Exception("Not enough Args")} - val appConfig = "/home/crajah/tmp/application.conf" - val libPath = "/home/crajah/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + val appPath = System.getenv("BIGDATA_APPCONF_PATH") + assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) + println( "Application Path is [%s]".format(appPath) ) + + val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") + assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) + println( "Job Library Path Path is [%s]".format(jobLibPath) ) + + val quorum = System.getenv("BIGDATA_QUORUM_NAMES") + assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) + println( "Quorum is [%s]".format(quorum) ) + JobRunner.main(Array(classOf[HBaseSaltTester].getName, "--hdfs", - "--app.conf.path", appConfig, - "--job.lib.path", libPath, + "--app.conf.path", appPath, + "--job.lib.path", jobLibPath, + "--quorum", quorum, "--debug", "true" )) }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala index 69f8b60..536f843 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldRead.scala @@ -42,7 +42,7 @@ class HBaseSourceShouldRead (args: Args) extends JobBase(args) with HBasePipeCon } // Set HBase host - val hbaseHost = "cldmgr.prod.bigdata.bskyb.com:2181" + val hbaseHost = args("quorum") // ----------------------------- // ----- Tests for TABLE_01 ---- diff --git a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala index aa77caa..50cd4f1 100644 --- a/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/testing/HBaseSourceShouldReadRunner.scala @@ -3,8 +3,17 @@ package parallelai.spyglass.hbase.testing import parallelai.spyglass.base.JobRunner object HBaseSourceShouldReadRunner extends App { - val appConfig = "/projects/applications.conf" - val libPath = "/media/sf__CHANDAN_RAJAH_/Dropbox/_WORK_/_SKY_/_BIG_DATA_/_SOURCES_/big_data/commons/commons.hbase.skybase/alternateLocation" + val appPath = System.getenv("BIGDATA_APPCONF_PATH") + assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) + println( "Application Path is [%s]".format(appPath) ) + + val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") + assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) + println( "Job Library Path Path is [%s]".format(jobLibPath) ) - JobRunner.main(Array(classOf[HBaseSourceShouldRead].getName, "--hdfs", "--app.conf.path", appConfig, "--job.lib.path", libPath)) + val quorum = System.getenv("BIGDATA_QUORUM_NAMES") + assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) + println( "Quorum is [%s]".format(quorum) ) + + JobRunner.main(Array(classOf[HBaseSourceShouldRead].getName, "--hdfs", "--app.conf.path", appPath, "--job.lib.path", jobLibPath, "--quorum", quorum)) }
\ No newline at end of file |