aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
-rw-r--r--src/main/scala/parallelai/spyglass/base/JobBase.scala75
-rw-r--r--src/main/scala/parallelai/spyglass/base/JobRunner.scala23
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala82
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala99
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala34
5 files changed, 313 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala
new file mode 100644
index 0000000..7040fa7
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala
@@ -0,0 +1,75 @@
+package parallelai.spyglass.base
+
+import com.twitter.scalding.Job
+import com.twitter.scalding.Args
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.filecache.DistributedCache
+import com.twitter.scalding.Mode
+import com.twitter.scalding.HadoopMode
+import com.typesafe.config.ConfigFactory
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import com.twitter.scalding.NullSource
+
+class JobBase(args: Args) extends Job(args) {
+ def getOrElseString(key: String, default: String): String = {
+ args.m.getOrElse[List[String]](key, List(default)).head
+ }
+
+ def getOrElseList(key: String, default: List[String]): List[String] = {
+ args.m.getOrElse[List[String]](key, default)
+ }
+
+ def getString(key: String): String = {
+ args.m.get(key) match {
+ case Some(v) => v.head
+ case None => sys.error(String.format("Argument [%s] - NOT FOUND", key))
+ }
+ }
+
+ def getList(key: String): List[String] = {
+ args.m.get(key) match {
+ case Some(v) => v
+ case None => sys.error(String.format("Argument [%s] - NOT FOUND", key))
+ }
+ }
+
+ def getJobConf(): Configuration = {
+ AppConfig.jobConfig
+ }
+
+
+ val appConfig = ConfigFactory.parseFile(new java.io.File(getString("app.conf.path")))
+
+ val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName()))
+
+ def modeString(): String = {
+ Mode.mode match {
+ case x:HadoopMode => "--hdfs"
+ case _ => "--local"
+ }
+ }
+
+ // Execute at instantiation
+ Mode.mode match {
+ case x:HadoopMode => {
+ log.info("In Hadoop Mode")
+ JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig);
+ }
+ case _ => {
+ log.info("In Local Mode")
+ }
+ }
+
+ def registerNullSourceSinkTaps(): Unit = {
+ val expectedSampleEndToEndOutput = List(("", ""),("", ""),("", ""))
+ val sourceTap = NullSource
+ .writeFrom(expectedSampleEndToEndOutput)
+ }
+}
+
+object AppConfig {
+ implicit var jobConfig : Configuration = new Configuration()
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala
new file mode 100644
index 0000000..b3a9af0
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala
@@ -0,0 +1,23 @@
+package parallelai.spyglass.base
+
+import org.apache.hadoop.conf.Configuration
+import com.twitter.scalding.Tool
+import org.apache.hadoop
+
+object JobRunner {
+ def main(args : Array[String]) {
+ val conf: Configuration = new Configuration
+
+ // TODO replace println with logging
+ if (args.contains("--heapInc")) {
+ println("Setting JVM Memory/Heap Size for every child mapper and reducer.");
+ val jvmOpts = "-Xmx4096m -XX:+PrintGCDetails -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=50"
+ println("**** JVM Options : " + jvmOpts )
+ conf.set("mapred.child.java.opts", jvmOpts);
+ }
+
+ AppConfig.jobConfig = conf
+
+ hadoop.util.ToolRunner.run(conf, new Tool, args);
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
new file mode 100644
index 0000000..e46ef50
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -0,0 +1,82 @@
+package parallelai.spyglass.hbase
+
+import java.io.IOException
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import com.twitter.scalding.AccessMode
+import com.twitter.scalding.Hdfs
+import com.twitter.scalding.Mode
+import com.twitter.scalding.Read
+import com.twitter.scalding.Source
+import com.twitter.scalding.Write
+import cascading.scheme.Scheme
+import cascading.tap.SinkMode
+import cascading.tap.Tap
+import cascading.tuple.Fields
+import org.apache.hadoop.mapred.RecordReader
+import scala.compat.Platform
+import org.apache.hadoop.mapred.OutputCollector
+import org.apache.hadoop.mapred.JobConf
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+object Conversions {
+ implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
+ implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong
+ implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get())
+ implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s))
+}
+
+class HBaseSource(
+ tableName: String = null,
+ quorumNames: String = "localhost",
+ keyFields: Fields = null,
+ familyNames: Array[String] = null,
+ valueFields: Array[Fields] = null,
+ timestamp: Long = Platform.currentTime,
+ sourceMode: SourceMode = SourceMode.SCAN_ALL,
+ startKey: String = null,
+ stopKey: String = null,
+ keyList: List[String] = null
+ ) extends Source {
+
+ override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields)
+ .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+
+ override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
+ val hBaseScheme = hdfsScheme match {
+ case hbase: HBaseScheme => hbase
+ case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")
+ }
+ mode match {
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
+
+ sourceMode match {
+ case SourceMode.SCAN_RANGE => {
+ hbt.setHBaseRangeParms(startKey, stopKey)
+ }
+ case SourceMode.SCAN_ALL => {
+ hbt.setHBaseScanAllParms()
+ }
+ case SourceMode.GET_LIST => {
+ if( keyList == null )
+ throw new IOException("Key list cannot be null when Source Mode is " + sourceMode)
+
+ hbt.setHBaseListParms(keyList.toArray[String])
+ }
+ case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode))
+ }
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ case Write => {
+ val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE)
+
+ hbt.asInstanceOf[Tap[_,_,_]]
+ }
+ }
+ case _ => super.createTap(readOrWrite)(mode)
+ }
+ }
+}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
new file mode 100644
index 0000000..4c86b07
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
@@ -0,0 +1,99 @@
+package parallelai.spyglass.hbase.example
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.client.HConnectionManager
+import org.apache.hadoop.hbase.client.HTable
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+
+import com.twitter.scalding._
+import com.twitter.scalding.Args
+
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseSource
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class HBaseExample(args: Args) extends JobBase(args) {
+
+ val isDebug: Boolean = args("debug").toBoolean
+
+ if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG)
+
+ val output = args("output")
+
+ println(output)
+
+ val jobConf = getJobConf
+
+ val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181"
+
+ case class HBaseTableStore(
+ conf: Configuration,
+ quorum: String,
+ tableName: String) {
+
+ val tableBytes = Bytes.toBytes(tableName)
+ val connection = HConnectionManager.getConnection(conf)
+ val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
+
+ conf.set("hbase.zookeeper.quorum", quorumNames);
+
+ val htable = new HTable(HBaseConfiguration.create(conf), tableName)
+
+ }
+
+ val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet")
+
+ val hbs2 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
+ .read
+ .write(Tsv(output.format("get_list")))
+
+ val hbs3 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
+ .read
+ .write(Tsv(output.format("scan_all")))
+
+ val hbs4 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
+ .read
+ .write(Tsv(output.format("scan_range_to_end")))
+
+ val hbs5 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
+ .read
+ .write(Tsv(output.format("scan_range_from_start")))
+
+ val hbs6 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
+ .read
+ .write(Tsv(output.format("scan_range_between")))
+
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
new file mode 100644
index 0000000..d6b762e
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
@@ -0,0 +1,34 @@
+package parallelai.spyglass.hbase.example
+
+import com.twitter.scalding.Tool
+import org.joda.time.format.DateTimeFormat
+import java.util.Formatter.DateTime
+
+object HBaseExampleRunner extends App {
+ val appPath = System.getenv("BIGDATA_APPCONF_PATH")
+ assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"})
+ println( "Application Path is [%s]".format(appPath) )
+
+ val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match {
+ case "hdfs" => "--hdfs"
+ case _ => "--local"
+ }}
+
+ println(modeString)
+
+ val jobLibPath = modeString match {
+ case "--hdfs" => {
+ val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")
+ assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"})
+ println( "Job Library Path Path is [%s]".format(jobLibPath) )
+ jobLibPath
+ }
+ case _ => ""
+ }
+
+ val output = "HBaseTest.%s.tsv"
+
+ Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath,
+ "--output", output, "--debug", "true", "--job.lib.path", jobLibPath ))
+
+} \ No newline at end of file