From cbf6c2903bfd0a5fe528c54382ea791c45637ded Mon Sep 17 00:00:00 2001 From: cra14 Date: Fri, 26 Apr 2013 12:47:12 +0100 Subject: First public release of Spy Glass code base --- .../spyglass/hbase/example/HBaseExample.scala | 99 ++++++++++++++++++++++ .../hbase/example/HBaseExampleRunner.scala | 34 ++++++++ 2 files changed, 133 insertions(+) create mode 100644 src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala create mode 100644 src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala (limited to 'src/main/scala/parallelai/spyglass/hbase/example') diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala new file mode 100644 index 0000000..4c86b07 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase.example + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.util.Bytes +import org.apache.log4j.Level +import org.apache.log4j.Logger + +import com.twitter.scalding._ +import com.twitter.scalding.Args + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class HBaseExample(args: Args) extends JobBase(args) { + + val isDebug: Boolean = args("debug").toBoolean + + if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) + + val output = args("output") + + println(output) + + val jobConf = getJobConf + + val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" + + case class HBaseTableStore( + conf: Configuration, + quorum: String, + tableName: String) { + + val tableBytes = Bytes.toBytes(tableName) + val connection = HConnectionManager.getConnection(conf) + val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + + conf.set("hbase.zookeeper.quorum", quorumNames); + + val htable = new HTable(HBaseConfiguration.create(conf), tableName) + + } + + val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") + + val hbs2 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) + .read + .write(Tsv(output.format("get_list"))) + + val hbs3 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") + .read + .write(Tsv(output.format("scan_all"))) + + val hbs4 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") + .read + .write(Tsv(output.format("scan_range_to_end"))) + + val hbs5 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") + .read + .write(Tsv(output.format("scan_range_from_start"))) + + val hbs6 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") + .read + .write(Tsv(output.format("scan_range_between"))) + +} \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala new file mode 100644 index 0000000..d6b762e --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -0,0 +1,34 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.Tool +import org.joda.time.format.DateTimeFormat +import java.util.Formatter.DateTime + +object HBaseExampleRunner extends App { + val appPath = System.getenv("BIGDATA_APPCONF_PATH") + assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) + println( "Application Path is [%s]".format(appPath) ) + + val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { + case "hdfs" => "--hdfs" + case _ => "--local" + }} + + println(modeString) + + val jobLibPath = modeString match { + case "--hdfs" => { + val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") + assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) + println( "Job Library Path Path is [%s]".format(jobLibPath) ) + jobLibPath + } + case _ => "" + } + + val output = "HBaseTest.%s.tsv" + + Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, + "--output", output, "--debug", "true", "--job.lib.path", jobLibPath )) + +} \ No newline at end of file -- cgit v1.2.3