diff options
author | cra14 <chandan.rajah2@bskyb.com> | 2013-04-26 12:47:12 +0100 |
---|---|---|
committer | cra14 <chandan.rajah2@bskyb.com> | 2013-04-26 12:47:12 +0100 |
commit | cbf6c2903bfd0a5fe528c54382ea791c45637ded (patch) | |
tree | 2ca67f31c4d0c1779c163cb48234e821616ec6e1 /src/main/scala/parallelai/spyglass | |
parent | d6d712287b2bcd74f0c5bbc3ecbb106741443d7c (diff) | |
download | SpyGlass-cbf6c2903bfd0a5fe528c54382ea791c45637ded.tar.gz SpyGlass-cbf6c2903bfd0a5fe528c54382ea791c45637ded.zip |
First public release of Spy Glass code base
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
5 files changed, 313 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala new file mode 100644 index 0000000..7040fa7 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala @@ -0,0 +1,75 @@ +package parallelai.spyglass.base + +import com.twitter.scalding.Job +import com.twitter.scalding.Args +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.filecache.DistributedCache +import com.twitter.scalding.Mode +import com.twitter.scalding.HadoopMode +import com.typesafe.config.ConfigFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import com.twitter.scalding.NullSource + +class JobBase(args: Args) extends Job(args) { + def getOrElseString(key: String, default: String): String = { + args.m.getOrElse[List[String]](key, List(default)).head + } + + def getOrElseList(key: String, default: List[String]): List[String] = { + args.m.getOrElse[List[String]](key, default) + } + + def getString(key: String): String = { + args.m.get(key) match { + case Some(v) => v.head + case None => sys.error(String.format("Argument [%s] - NOT FOUND", key)) + } + } + + def getList(key: String): List[String] = { + args.m.get(key) match { + case Some(v) => v + case None => sys.error(String.format("Argument [%s] - NOT FOUND", key)) + } + } + + def getJobConf(): Configuration = { + AppConfig.jobConfig + } + + + val appConfig = ConfigFactory.parseFile(new java.io.File(getString("app.conf.path"))) + + val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName())) + + def modeString(): String = { + Mode.mode match { + case x:HadoopMode => "--hdfs" + case _ => "--local" + } + } + + // Execute at instantiation + Mode.mode match { + case x:HadoopMode => { + log.info("In Hadoop Mode") + JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig); + } + case _ => { + log.info("In Local Mode") + } + } + + def registerNullSourceSinkTaps(): Unit = { + val expectedSampleEndToEndOutput = List(("", ""),("", ""),("", "")) + val sourceTap = NullSource + .writeFrom(expectedSampleEndToEndOutput) + } +} + +object AppConfig { + implicit var jobConfig : Configuration = new Configuration() +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala new file mode 100644 index 0000000..b3a9af0 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala @@ -0,0 +1,23 @@ +package parallelai.spyglass.base + +import org.apache.hadoop.conf.Configuration +import com.twitter.scalding.Tool +import org.apache.hadoop + +object JobRunner { + def main(args : Array[String]) { + val conf: Configuration = new Configuration + + // TODO replace println with logging + if (args.contains("--heapInc")) { + println("Setting JVM Memory/Heap Size for every child mapper and reducer."); + val jvmOpts = "-Xmx4096m -XX:+PrintGCDetails -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=50" + println("**** JVM Options : " + jvmOpts ) + conf.set("mapred.child.java.opts", jvmOpts); + } + + AppConfig.jobConfig = conf + + hadoop.util.ToolRunner.run(conf, new Tool, args); + } +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala new file mode 100644 index 0000000..e46ef50 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -0,0 +1,82 @@ +package parallelai.spyglass.hbase + +import java.io.IOException +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.AccessMode +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Mode +import com.twitter.scalding.Read +import com.twitter.scalding.Source +import com.twitter.scalding.Write +import cascading.scheme.Scheme +import cascading.tap.SinkMode +import cascading.tap.Tap +import cascading.tuple.Fields +import org.apache.hadoop.mapred.RecordReader +import scala.compat.Platform +import org.apache.hadoop.mapred.OutputCollector +import org.apache.hadoop.mapred.JobConf +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +object Conversions { + implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) + implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong + implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get()) + implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s)) +} + +class HBaseSource( + tableName: String = null, + quorumNames: String = "localhost", + keyFields: Fields = null, + familyNames: Array[String] = null, + valueFields: Array[Fields] = null, + timestamp: Long = Platform.currentTime, + sourceMode: SourceMode = SourceMode.SCAN_ALL, + startKey: String = null, + stopKey: String = null, + keyList: List[String] = null + ) extends Source { + + override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) + .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { + val hBaseScheme = hdfsScheme match { + case hbase: HBaseScheme => hbase + case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme") + } + mode match { + case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Read => { + val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) + + sourceMode match { + case SourceMode.SCAN_RANGE => { + hbt.setHBaseRangeParms(startKey, stopKey) + } + case SourceMode.SCAN_ALL => { + hbt.setHBaseScanAllParms() + } + case SourceMode.GET_LIST => { + if( keyList == null ) + throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) + + hbt.setHBaseListParms(keyList.toArray[String]) + } + case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) + } + + hbt.asInstanceOf[Tap[_,_,_]] + } + case Write => { + val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) + + hbt.asInstanceOf[Tap[_,_,_]] + } + } + case _ => super.createTap(readOrWrite)(mode) + } + } +} diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala new file mode 100644 index 0000000..4c86b07 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase.example + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.util.Bytes +import org.apache.log4j.Level +import org.apache.log4j.Logger + +import com.twitter.scalding._ +import com.twitter.scalding.Args + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class HBaseExample(args: Args) extends JobBase(args) { + + val isDebug: Boolean = args("debug").toBoolean + + if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) + + val output = args("output") + + println(output) + + val jobConf = getJobConf + + val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" + + case class HBaseTableStore( + conf: Configuration, + quorum: String, + tableName: String) { + + val tableBytes = Bytes.toBytes(tableName) + val connection = HConnectionManager.getConnection(conf) + val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + + conf.set("hbase.zookeeper.quorum", quorumNames); + + val htable = new HTable(HBaseConfiguration.create(conf), tableName) + + } + + val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") + + val hbs2 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) + .read + .write(Tsv(output.format("get_list"))) + + val hbs3 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") + .read + .write(Tsv(output.format("scan_all"))) + + val hbs4 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") + .read + .write(Tsv(output.format("scan_range_to_end"))) + + val hbs5 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") + .read + .write(Tsv(output.format("scan_range_from_start"))) + + val hbs6 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") + .read + .write(Tsv(output.format("scan_range_between"))) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala new file mode 100644 index 0000000..d6b762e --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -0,0 +1,34 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.Tool +import org.joda.time.format.DateTimeFormat +import java.util.Formatter.DateTime + +object HBaseExampleRunner extends App { + val appPath = System.getenv("BIGDATA_APPCONF_PATH") + assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) + println( "Application Path is [%s]".format(appPath) ) + + val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { + case "hdfs" => "--hdfs" + case _ => "--local" + }} + + println(modeString) + + val jobLibPath = modeString match { + case "--hdfs" => { + val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") + assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) + println( "Job Library Path Path is [%s]".format(jobLibPath) ) + jobLibPath + } + case _ => "" + } + + val output = "HBaseTest.%s.tsv" + + Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, + "--output", output, "--debug", "true", "--job.lib.path", jobLibPath )) + +}
\ No newline at end of file |