diff options
author | Chandan Rajah <chandan.rajah@gmail.com> | 2013-07-18 17:19:24 +0100 |
---|---|---|
committer | Chandan Rajah <chandan.rajah@gmail.com> | 2013-07-18 17:19:24 +0100 |
commit | bdb0933779f63a4f4be691feae7741b4dbb96b35 (patch) | |
tree | 2ff9bef8ff4f95a1d819672b1e5b7acd8a57cb1e /src/main/scala/parallelai/spyglass/hbase | |
parent | 1f0450297589ecbf89862faa951ea9004df6293e (diff) | |
download | SpyGlass-bdb0933779f63a4f4be691feae7741b4dbb96b35.tar.gz SpyGlass-bdb0933779f63a4f4be691feae7741b4dbb96b35.zip |
Added support for split grouping in order to reduce the number of mappers created for large tables
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase')
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala | 152 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala | 63 |
2 files changed, 113 insertions, 102 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index eccd653..2aa5342 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -7,93 +7,101 @@ import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.util.Bytes import org.apache.log4j.Level import org.apache.log4j.Logger - import com.twitter.scalding._ import com.twitter.scalding.Args - import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import org.apache.hadoop.hbase.client.Put +import parallelai.spyglass.hbase.HBaseSalter class HBaseExample(args: Args) extends JobBase(args) { - val isDebug: Boolean = args("debug").toBoolean + val isDebug: Boolean = args("debug").toBoolean - if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) - val output = args("output") + val output = args("output") - println(output) + val jobConf = getJobConf() - val jobConf = getJobConf() + val quorumNames = args("quorum") - val quorumNames = args("quorum") + println("Output : " + output) + println("Quorum : " + quorumNames) - case class HBaseTableStore( + case class HBaseTableStore( conf: Configuration, quorum: String, tableName: String) { - val tableBytes = Bytes.toBytes(tableName) - val connection = HConnectionManager.getConnection(conf) - val maxThreads = conf.getInt("hbase.htable.threads.max", 1) - - conf.set("hbase.zookeeper.quorum", quorumNames) - - val htable = new HTable(HBaseConfiguration.create(conf), tableName) - - } - - val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet") - - val hbs2 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) - .read - .write(Tsv(output.format("get_list"))) - - val hbs3 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") - .read - .write(Tsv(output.format("scan_all"))) - - val hbs4 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") - .read - .write(Tsv(output.format("scan_range_to_end"))) - - val hbs5 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") - .read - .write(Tsv(output.format("scan_range_from_start"))) - - val hbs6 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - List("column_family"), - List('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") - .read - .write(Tsv(output.format("scan_range_between"))) - -}
\ No newline at end of file + val tableBytes = Bytes.toBytes(tableName) + val connection = HConnectionManager.getConnection(conf) + val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + + conf.set("hbase.zookeeper.quorum", quorumNames) + + val htable = new HTable(HBaseConfiguration.create(conf), tableName) + + def makeN(n: Int) { + (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => { + val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x))) + put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x)) + }) + } + + } + + HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000) + + val hbs2 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.GET_LIST, keyList = List("13914", "10687", "14897").map(x => "%015d".format(x.toLong)), useSalt = true) + .read + .write(Tsv(output.format("get_list"))) + + val hbs3 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") + .read + .write(Tsv(output.format("scan_all"))) + + val hbs4 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_RANGE, stopKey = "%015d".format("13914".toLong), useSalt = true) + .read + .write(Tsv(output.format("scan_range_to_end"))) + + val hbs5 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), useSalt = true) + .read + .write(Tsv(output.format("scan_range_from_start"))) + + val hbs6 = new HBaseSource( + "_TEST.SALT.01", + quorumNames, + 'key, + List("data"), + List('data), + sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), stopKey = "%015d".format("16897".toLong), useSalt = true) + .read + .write(Tsv(output.format("scan_range_between"))) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala index 890d2be..920f17d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -3,36 +3,39 @@ package parallelai.spyglass.hbase.example import com.twitter.scalding.Tool import org.joda.time.format.DateTimeFormat import java.util.Formatter.DateTime +import parallelai.spyglass.base.JobRunner object HBaseExampleRunner extends App { - val appPath = System.getenv("BIGDATA_APPCONF_PATH") - assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) - println( "Application Path is [%s]".format(appPath) ) - - val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { - case "hdfs" => "--hdfs" - case _ => "--local" - }} - - println(modeString) - - val jobLibPath = modeString match { - case "--hdfs" => { - val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") - assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) - println( "Job Library Path Path is [%s]".format(jobLibPath) ) - jobLibPath - } - case _ => "" - } - - val quorum = System.getenv("BIGDATA_QUORUM_NAMES") - assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) - println( "Quorum is [%s]".format(quorum) ) - - val output = "HBaseTest.%s.tsv" - - Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, - "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum )) - + val appPath = System.getenv("BIGDATA_APPCONF_PATH") + assert(appPath != null, { "Environment Variable BIGDATA_APPCONF_PATH is undefined or Null" }) + println("Application Path is [%s]".format(appPath)) + + val modeString = if (args.length == 0) { "--hdfs" } else { + args(0) match { + case "hdfs" => "--hdfs" + case _ => "--hdfs" + } + } + + println(modeString) + + val jobLibPath = modeString match { + case "--hdfs" => { + val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") + assert(jobLibPath != null, { "Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null" }) + println("Job Library Path Path is [%s]".format(jobLibPath)) + jobLibPath + } + case _ => "" + } + + val quorum = System.getenv("BIGDATA_QUORUM_NAMES") + assert(quorum != null, { "Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null" }) + println("Quorum is [%s]".format(quorum)) + + val output = "HBaseTest.%s" + + JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath, + "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum)) + }
\ No newline at end of file |