aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass/base
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass/base')
-rw-r--r--src/main/scala/parallelai/spyglass/base/JobBase.scala75
-rw-r--r--src/main/scala/parallelai/spyglass/base/JobRunner.scala23
2 files changed, 98 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala
new file mode 100644
index 0000000..7040fa7
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala
@@ -0,0 +1,75 @@
+package parallelai.spyglass.base
+
+import com.twitter.scalding.Job
+import com.twitter.scalding.Args
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.filecache.DistributedCache
+import com.twitter.scalding.Mode
+import com.twitter.scalding.HadoopMode
+import com.typesafe.config.ConfigFactory
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import com.twitter.scalding.NullSource
+
+class JobBase(args: Args) extends Job(args) {
+ def getOrElseString(key: String, default: String): String = {
+ args.m.getOrElse[List[String]](key, List(default)).head
+ }
+
+ def getOrElseList(key: String, default: List[String]): List[String] = {
+ args.m.getOrElse[List[String]](key, default)
+ }
+
+ def getString(key: String): String = {
+ args.m.get(key) match {
+ case Some(v) => v.head
+ case None => sys.error(String.format("Argument [%s] - NOT FOUND", key))
+ }
+ }
+
+ def getList(key: String): List[String] = {
+ args.m.get(key) match {
+ case Some(v) => v
+ case None => sys.error(String.format("Argument [%s] - NOT FOUND", key))
+ }
+ }
+
+ def getJobConf(): Configuration = {
+ AppConfig.jobConfig
+ }
+
+
+ val appConfig = ConfigFactory.parseFile(new java.io.File(getString("app.conf.path")))
+
+ val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName()))
+
+ def modeString(): String = {
+ Mode.mode match {
+ case x:HadoopMode => "--hdfs"
+ case _ => "--local"
+ }
+ }
+
+ // Execute at instantiation
+ Mode.mode match {
+ case x:HadoopMode => {
+ log.info("In Hadoop Mode")
+ JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig);
+ }
+ case _ => {
+ log.info("In Local Mode")
+ }
+ }
+
+ def registerNullSourceSinkTaps(): Unit = {
+ val expectedSampleEndToEndOutput = List(("", ""),("", ""),("", ""))
+ val sourceTap = NullSource
+ .writeFrom(expectedSampleEndToEndOutput)
+ }
+}
+
+object AppConfig {
+ implicit var jobConfig : Configuration = new Configuration()
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala
new file mode 100644
index 0000000..b3a9af0
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala
@@ -0,0 +1,23 @@
+package parallelai.spyglass.base
+
+import org.apache.hadoop.conf.Configuration
+import com.twitter.scalding.Tool
+import org.apache.hadoop
+
+object JobRunner {
+ def main(args : Array[String]) {
+ val conf: Configuration = new Configuration
+
+ // TODO replace println with logging
+ if (args.contains("--heapInc")) {
+ println("Setting JVM Memory/Heap Size for every child mapper and reducer.");
+ val jvmOpts = "-Xmx4096m -XX:+PrintGCDetails -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=50"
+ println("**** JVM Options : " + jvmOpts )
+ conf.set("mapred.child.java.opts", jvmOpts);
+ }
+
+ AppConfig.jobConfig = conf
+
+ hadoop.util.ToolRunner.run(conf, new Tool, args);
+ }
+} \ No newline at end of file