From b4f423438e9726d9c5efdb295552e8e737e2ca8f Mon Sep 17 00:00:00 2001 From: Saad Rashid Date: Thu, 20 Feb 2014 16:21:06 +0000 Subject: Update Scalding and Cascading latest release and pom versions. Now SpyGlass supports scalding (0.9.0rc4) and cascading (2.2.1). Its a release candidate. --- .../scala/parallelai/spyglass/base/JobBase.scala | 7 +-- .../parallelai/spyglass/hbase/HBaseSource.scala | 20 ++++++-- .../hbase/example/SimpleHBaseSourceExample.scala | 53 +++++++++++++++++----- .../hbase/example/SimpleHBaseSourceRunner.scala | 26 +++++++++++ .../parallelai/spyglass/jdbc/JDBCSource.scala | 14 ++++-- .../example/SimpleHBaseSourceExampleTest.scala | 4 +- 6 files changed, 99 insertions(+), 25 deletions(-) create mode 100644 src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala index 7040fa7..6cb79cd 100644 --- a/src/main/scala/parallelai/spyglass/base/JobBase.scala +++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala @@ -6,12 +6,13 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.filecache.DistributedCache -import com.twitter.scalding.Mode import com.twitter.scalding.HadoopMode import com.typesafe.config.ConfigFactory import org.slf4j.Logger import org.slf4j.LoggerFactory import com.twitter.scalding.NullSource +import parallelai.spyglass.base._ +import com.twitter.scalding.Mode class JobBase(args: Args) extends Job(args) { def getOrElseString(key: String, default: String): String = { @@ -46,14 +47,14 @@ class JobBase(args: Args) extends Job(args) { val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName())) def modeString(): String = { - Mode.mode match { + Mode.getMode(args) match { case x:HadoopMode => "--hdfs" case _ => "--local" } } // Execute at instantiation - Mode.mode match { + Mode.getMode(args) match { case x:HadoopMode => { log.info("In Hadoop Mode") JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig); diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index c214e99..bbae8f6 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -7,9 +7,7 @@ import com.twitter.scalding.AccessMode import com.twitter.scalding.Hdfs import com.twitter.scalding.Mode import com.twitter.scalding.Read -import com.twitter.scalding.Source import com.twitter.scalding.Write - import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode} import cascading.scheme.{NullScheme, Scheme} import cascading.tap.SinkMode @@ -18,6 +16,8 @@ import cascading.tuple.Fields import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.JobConf +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import com.twitter.scalding.Source object Conversions { implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -47,11 +47,11 @@ case class HBaseSource( val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray) internalScheme.setInputSplitTye(inputSplitType) - override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) - override def localScheme = new NullScheme(allFields, allFields) + def localScheme = new NullScheme(allFields, allFields) override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val hBaseScheme = hdfsScheme match { @@ -92,7 +92,17 @@ case class HBaseSource( hbt.asInstanceOf[Tap[_,_,_]] } } - case _ => super.createTap(readOrWrite)(mode) + case _ => createEmptyTap(readOrWrite)(mode) } } + + + + def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { + mode match { + case _ => { + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) + } + } + } } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala index 7ba2788..5d844cb 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -6,27 +6,56 @@ import org.apache.log4j.{Level, Logger} import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource} import parallelai.spyglass.hbase.HBaseConstants.SourceMode import cascading.tuple.Fields +import com.twitter.scalding.IterableSource /** * Simple example of HBaseSource usage */ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeConversions { - val isDebug: Boolean = args("debug").toBoolean - - if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) + //val isDebug: Boolean = args("debug").toBoolean + //if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) val output = args("output") - val hbs = new HBaseSource( - "table_name", - "quorum_name:2181", + val hbsOut = new HBaseSource( + "spyglass.hbase.test", + "cldmgr.prod.bigdata.bskyb.com:2181", new Fields("key"), - List("column_family"), - List(new Fields("column_name1", "column_name2")), - sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")) - .read - .fromBytesWritable(new Fields("key", "column_name1", "column_name2")) - .write(Tsv(output format "get_list")) + List("data", "data"), + List(new Fields("test1", "test2"))) + + val data = List( + ("100", 1, "A"), + ("101", 2, "B"), + ("102" , 3 , "C"), + ("103" , 4 , "D"), + ("104" , 5 , "E"), + ("104" , 6 , "F")) + + val testDataPipe = + IterableSource[(String, Int, String)](data, ('key, 'test1, 'test2)) + .debug + .toBytesWritable(List('key, 'test1, 'test2)) + + val writer = testDataPipe + writer.write(hbsOut) + + val hbs = new HBaseSource( + "spyglass.hbase.test", + "cldmgr.prod.bigdata.bskyb.com:2181", + new Fields("key"), + List("data", "data"), + List(new Fields("test1", "test2")), + sourceMode = SourceMode.SCAN_ALL) + .read + .fromBytesWritable(new Fields("key", "test1", "test2")) + + val fileWriter = hbs + fileWriter.write(Tsv("scan_all.txt")) + + + + } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala new file mode 100644 index 0000000..bbcf96d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala @@ -0,0 +1,26 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.Args +import org.slf4j.LoggerFactory +import parallelai.spyglass.base.JobRunner + +object SimpleHBaseSourceRunner extends App { + + val mArgs = Args(args) + + val log = LoggerFactory.getLogger(this.getClass.getName) + + + + log.info("Starting HBaseSource Import Process Test...") + + val start1 = System.currentTimeMillis + + JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray) + + val end = System.currentTimeMillis + + log.info("HBaseSource Import process finished successfully.") + log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete") + +} \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index beb66be..09f4579 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -28,11 +28,11 @@ case class JDBCSource( updateByFields: Fields = null ) extends Source { - override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray) + val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray) .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] // To enable local mode testing - override def localScheme = new NullScheme(fields, fields) + def localScheme = new NullScheme(fields, fields) override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { val jdbcScheme = hdfsScheme match { @@ -53,7 +53,15 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } - case _ => super.createTap(readOrWrite)(mode) + case _ => createEmptyTap(readOrWrite)(mode) } } + + def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { + mode match { + case _ => { + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) + } + } + } } diff --git a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala index e5b2cca..93ee129 100644 --- a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala +++ b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala @@ -7,11 +7,11 @@ import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory import org.apache.hadoop.hbase.io.ImmutableBytesWritable import cascading.tuple.{Tuple, Fields} -import parallelai.spyglass.hbase.HBaseConstants.SourceMode import org.apache.hadoop.hbase.util.Bytes import scala._ -import parallelai.spyglass.hbase.HBaseSource import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode /** * Example of how to define tests for HBaseSource -- cgit v1.2.3