diff options
Diffstat (limited to 'src/main')
5 files changed, 97 insertions, 23 deletions
| diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala index 7040fa7..6cb79cd 100644 --- a/src/main/scala/parallelai/spyglass/base/JobBase.scala +++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala @@ -6,12 +6,13 @@ import org.apache.hadoop.fs.Path  import org.apache.hadoop.conf.Configuration  import org.apache.hadoop.fs.FileSystem  import org.apache.hadoop.filecache.DistributedCache -import com.twitter.scalding.Mode  import com.twitter.scalding.HadoopMode  import com.typesafe.config.ConfigFactory  import org.slf4j.Logger  import org.slf4j.LoggerFactory  import com.twitter.scalding.NullSource +import parallelai.spyglass.base._ +import com.twitter.scalding.Mode  class JobBase(args: Args) extends Job(args) {    def getOrElseString(key: String, default: String): String = { @@ -46,14 +47,14 @@ class JobBase(args: Args) extends Job(args) {    val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName()))    def modeString(): String = { -      Mode.mode match { +      Mode.getMode(args) match {          case x:HadoopMode  => "--hdfs"          case _ => "--local"      }    }    // Execute at instantiation -  Mode.mode match { +  Mode.getMode(args) match {      case x:HadoopMode => {        log.info("In Hadoop Mode")        JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig); diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index c214e99..bbae8f6 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -7,9 +7,7 @@ import com.twitter.scalding.AccessMode  import com.twitter.scalding.Hdfs  import com.twitter.scalding.Mode  import com.twitter.scalding.Read -import com.twitter.scalding.Source  import com.twitter.scalding.Write -  import parallelai.spyglass.hbase.HBaseConstants.{SplitType, SourceMode}  import cascading.scheme.{NullScheme, Scheme}  import cascading.tap.SinkMode @@ -18,6 +16,8 @@ import cascading.tuple.Fields  import org.apache.hadoop.mapred.RecordReader  import org.apache.hadoop.mapred.OutputCollector  import org.apache.hadoop.mapred.JobConf +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import com.twitter.scalding.Source  object Conversions {    implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) @@ -47,11 +47,11 @@ case class HBaseSource(    val internalScheme = new HBaseScheme(keyFields, timestamp, familyNames.toArray, valueFields.toArray)    internalScheme.setInputSplitTye(inputSplitType) -  override val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +  val hdfsScheme = internalScheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]    // To enable local mode testing    val allFields = keyFields.append(valueFields.toArray) -  override def localScheme = new NullScheme(allFields, allFields) +  def localScheme = new NullScheme(allFields, allFields)    override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {      val hBaseScheme = hdfsScheme match { @@ -92,7 +92,17 @@ case class HBaseSource(            hbt.asInstanceOf[Tap[_,_,_]]          }        } -      case _ => super.createTap(readOrWrite)(mode) +      case _ => createEmptyTap(readOrWrite)(mode)      }    } +   +   +   +  def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { +    mode match { +      case _ => { +        throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) +      } +    } +  }    } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala index 7ba2788..5d844cb 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -6,27 +6,56 @@ import org.apache.log4j.{Level, Logger}  import parallelai.spyglass.hbase.{HBasePipeConversions, HBaseSource}  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import cascading.tuple.Fields +import com.twitter.scalding.IterableSource  /**    * Simple example of HBaseSource usage    */  class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeConversions { -   val isDebug: Boolean = args("debug").toBoolean - -   if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) +   //val isDebug: Boolean = args("debug").toBoolean +   //if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)     val output = args("output") -   val hbs = new HBaseSource( -     "table_name", -     "quorum_name:2181", +   val hbsOut = new HBaseSource( +     "spyglass.hbase.test", +     "cldmgr.prod.bigdata.bskyb.com:2181",       new Fields("key"), -     List("column_family"), -     List(new Fields("column_name1", "column_name2")), -     sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")) -     .read -     .fromBytesWritable(new Fields("key", "column_name1", "column_name2")) -     .write(Tsv(output format "get_list")) +     List("data", "data"), +     List(new Fields("test1", "test2")))    +    +  val data = List( +    ("100", 1, "A"), +    ("101", 2,  "B"), +    ("102" , 3 , "C"), +    ("103" , 4 , "D"), +    ("104" , 5 , "E"), +    ("104" , 6 , "F")) +    +  val testDataPipe = +    IterableSource[(String, Int, String)](data, ('key, 'test1, 'test2)) +      .debug +      .toBytesWritable(List('key, 'test1, 'test2)) +       +   val writer = testDataPipe    +   writer.write(hbsOut)     +     +   val hbs = new HBaseSource( +	     "spyglass.hbase.test", +	     "cldmgr.prod.bigdata.bskyb.com:2181", +	     new Fields("key"), +	     List("data", "data"), +	     List(new Fields("test1", "test2")), +	     sourceMode = SourceMode.SCAN_ALL) +	     .read +	     .fromBytesWritable(new Fields("key", "test1", "test2")) + +	val fileWriter = hbs      +	fileWriter.write(Tsv("scan_all.txt")) +   +   +	 +      } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala new file mode 100644 index 0000000..bbcf96d --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala @@ -0,0 +1,26 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.Args +import org.slf4j.LoggerFactory +import parallelai.spyglass.base.JobRunner + +object SimpleHBaseSourceRunner extends App { + +  val mArgs = Args(args) + +  val log = LoggerFactory.getLogger(this.getClass.getName) + + + +  log.info("Starting HBaseSource Import Process Test...") + +  val start1 = System.currentTimeMillis + +  JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray) + +  val end = System.currentTimeMillis + +  log.info("HBaseSource Import process finished successfully.") +  log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete") +   +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index beb66be..09f4579 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -28,11 +28,11 @@ case class JDBCSource(      updateByFields: Fields = null    ) extends Source { -  override val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray) +  val hdfsScheme = new JDBCScheme(fields, columnNames.toArray, orderBy.toArray, updateByFields, updateBy.toArray)      .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]    // To enable local mode testing -  override def localScheme = new NullScheme(fields, fields) +  def localScheme = new NullScheme(fields, fields)    override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {      val jdbcScheme = hdfsScheme match { @@ -53,7 +53,15 @@ case class JDBCSource(            jdbcTap.asInstanceOf[Tap[_,_,_]]          }        } -      case _ => super.createTap(readOrWrite)(mode) +      case _ => createEmptyTap(readOrWrite)(mode)      }    } +   +  def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { +    mode match { +      case _ => { +        throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) +      } +    } +  }      } | 
