aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala/parallelai/spyglass
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
-rw-r--r--src/main/scala/parallelai/spyglass/base/JobBase.scala7
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala20
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala53
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala26
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala14
5 files changed, 97 insertions, 23 deletions
diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala
index 7040fa7..6cb79cd 100644
--- a/src/main/scala/parallelai/spyglass/base/JobBase.scala
+++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala
@@ -6,12 +6,13 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.filecache.DistributedCache
-import com.twitter.scalding.Mode
import com.twitter.scalding.HadoopMode
import com.typesafe.config.ConfigFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import com.twitter.scalding.NullSource
+import parallelai.spyglass.base._
+import com.twitter.scalding.Mode
class JobBase(args: Args) extends Job(args) {
def getOrElseString(key: String, default: String): String = {
@@ -46,14 +47,14 @@ class JobBase(args: Args) extends Job(args) {
val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName()))
def modeString(): String = {
- Mode.mode match {
+ Mode.getMode(args) match {
case x:HadoopMode => "--hdfs"
case _ => "--local"
}
}
// Execute at instantiation
- Mode.mode match {
+ Mode.getMode(args) match {
case x:HadoopMode => {
log.info("In Hadoop Mode")
JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig);
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index c214e99..bbae8f6 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -7,9 +7,7 @@ import com.twitter.scalding.AccessMode
import com.twitter.scalding.Hdfs
import com.twitter.scalding.Mode
import com.twitter.scalding.Read
-import com.twitter.scalding.Source
import com.twitter.scalding.Write
-
import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}
import cascading.scheme.{NullScheme, Scheme}
import cascading.tap.SinkMode
@@ -18,6 +16,8 @@ import cascading.tuple.Fields
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.JobConf
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import com.twitter.scalding.Source
object Conversions {
implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes)
@@ -47,11 +47,11 @@ case class HBaseSource(
val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)
internalScheme.setInputSplitTye(inputSplitType)
- override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
+ val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
- override def localScheme = new NullScheme(allFields, allFields)
+ def localScheme = new NullScheme(allFields, allFields)
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val hBaseScheme = hdfsScheme match {
@@ -92,7 +92,17 @@ case class HBaseSource(
hbt.asInstanceOf[Tap[_,_,_]]
}
}
- case _ => super.createTap(readOrWrite)(mode)
+ case _ => createEmptyTap(readOrWrite)(mode)
}
}
+
+
+
+ def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
+ mode match {
+ case _ => {
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
+ }
+ }
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
index 7ba2788..5d844cb 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
@@ -6,27 +6,56 @@ import org.apache.log4j.{Level, Logger}
import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import cascading.tuple.Fields
+import com.twitter.scalding.IterableSource
/**
* Simple example of HBaseSource usage
*/
class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeConversions {
- val isDebug: Boolean = args("debug").toBoolean
-
- if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
+ //val isDebug: Boolean = args("debug").toBoolean
+ //if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
val output = args("output")
- val hbs = new HBaseSource(
- "table_name",
- "quorum_name:2181",
+ val hbsOut = new HBaseSource(
+ "spyglass.hbase.test",
+ "cldmgr.prod.bigdata.bskyb.com:2181",
new Fields("key"),
- List("column_family"),
- List(new Fields("column_name1", "column_name2")),
- sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3"))
- .read
- .fromBytesWritable(new Fields("key", "column_name1", "column_name2"))
- .write(Tsv(output format "get_list"))
+ List("data", "data"),
+ List(new Fields("test1", "test2")))
+
+ val data = List(
+ ("100", 1, "A"),
+ ("101", 2, "B"),
+ ("102" , 3 , "C"),
+ ("103" , 4 , "D"),
+ ("104" , 5 , "E"),
+ ("104" , 6 , "F"))
+
+ val testDataPipe =
+ IterableSource[(String, Int, String)](data, ('key, 'test1, 'test2))
+ .debug
+ .toBytesWritable(List('key, 'test1, 'test2))
+
+ val writer = testDataPipe
+ writer.write(hbsOut)
+
+ val hbs = new HBaseSource(
+ "spyglass.hbase.test",
+ "cldmgr.prod.bigdata.bskyb.com:2181",
+ new Fields("key"),
+ List("data", "data"),
+ List(new Fields("test1", "test2")),
+ sourceMode = SourceMode.SCAN_ALL)
+ .read
+ .fromBytesWritable(new Fields("key", "test1", "test2"))
+
+ val fileWriter = hbs
+ fileWriter.write(Tsv("scan_all.txt"))
+
+
+
+
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala
new file mode 100644
index 0000000..bbcf96d
--- /dev/null
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala
@@ -0,0 +1,26 @@
+package parallelai.spyglass.hbase.example
+
+import com.twitter.scalding.Args
+import org.slf4j.LoggerFactory
+import parallelai.spyglass.base.JobRunner
+
+object SimpleHBaseSourceRunner extends App {
+
+ val mArgs = Args(args)
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+
+
+ log.info("Starting HBaseSource Import Process Test...")
+
+ val start1 = System.currentTimeMillis
+
+ JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray)
+
+ val end = System.currentTimeMillis
+
+ log.info("HBaseSource Import process finished successfully.")
+ log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete")
+
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index beb66be..09f4579 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -28,11 +28,11 @@ case class JDBCSource(
updateByFields: Fields = null
) extends Source {
- override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)
+ val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
// To enable local mode testing
- override def localScheme = new NullScheme(fields, fields)
+ def localScheme = new NullScheme(fields, fields)
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
val jdbcScheme = hdfsScheme match {
@@ -53,7 +53,15 @@ case class JDBCSource(
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
}
- case _ => super.createTap(readOrWrite)(mode)
+ case _ => createEmptyTap(readOrWrite)(mode)
}
}
+
+ def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
+ mode match {
+ case _ => {
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
+ }
+ }
}