diff options
Diffstat (limited to 'src/main/scala')
3 files changed, 126 insertions, 117 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index b6d5742..31ed3ea 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -10,15 +10,14 @@ import org.apache.hadoop.hbase.util.Bytes  import cascading.tuple.TupleEntry  class HBasePipeWrapper (pipe: Pipe) { -   def toBytesWritable(f: Fields): Pipe = { +    def toBytesWritable(f: Fields): Pipe = {  	  asList(f) -     .foldLeft(pipe){ (p, f) => { -	    p.map(f.toString -> f.toString){ from: String => { -	      new ImmutableBytesWritable(Bytes.toBytes( -	          if (from == null) "" else from)) -	    }} -	  }}  -	} +        .foldLeft(pipe){ (p, f) => { +	      p.map(f.toString -> f.toString){ from: String => +            Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null) +          }} +      } +    }  //   def toBytesWritable : Pipe = {  //	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { @@ -30,13 +29,12 @@ class HBasePipeWrapper (pipe: Pipe) {  	def fromBytesWritable(f: Fields): Pipe = {  	  asList(f) -	  .foldLeft(pipe) { (p, fld) => -	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { -	    	Bytes.toString(from.get) -	      } -	    } -	  } -	} +	    .foldLeft(pipe) { (p, fld) => { +	      p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => +            Option(from).map(x => Bytes.toString(x.get)).getOrElse(null) +          } +        }} +    }  //	def fromBytesWritable : Pipe = {  //	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala index eccd653..2aa5342 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -7,93 +7,101 @@ import org.apache.hadoop.hbase.client.HTable  import org.apache.hadoop.hbase.util.Bytes  import org.apache.log4j.Level  import org.apache.log4j.Logger -  import com.twitter.scalding._  import com.twitter.scalding.Args -  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseSource  import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import org.apache.hadoop.hbase.client.Put +import parallelai.spyglass.hbase.HBaseSalter  class HBaseExample(args: Args) extends JobBase(args) { -  val isDebug: Boolean = args("debug").toBoolean +   val isDebug: Boolean = args("debug").toBoolean -  if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) +   if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) -  val output = args("output") +   val output = args("output") -  println(output) +   val jobConf = getJobConf() -  val jobConf = getJobConf() +   val quorumNames = args("quorum") -  val quorumNames = args("quorum") +   println("Output : " + output) +   println("Quorum : " + quorumNames) -  case class HBaseTableStore( +   case class HBaseTableStore(        conf: Configuration,        quorum: String,        tableName: String) { -    val tableBytes = Bytes.toBytes(tableName) -    val connection = HConnectionManager.getConnection(conf) -    val maxThreads = conf.getInt("hbase.htable.threads.max", 1) - -    conf.set("hbase.zookeeper.quorum", quorumNames) - -    val htable = new HTable(HBaseConfiguration.create(conf), tableName) - -  } - -  val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet") - -  val hbs2 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) -    .read -    .write(Tsv(output.format("get_list"))) - -  val hbs3 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") -    .read -    .write(Tsv(output.format("scan_all"))) - -  val hbs4 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") -    .read -    .write(Tsv(output.format("scan_range_to_end"))) - -  val hbs5 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") -    .read -    .write(Tsv(output.format("scan_range_from_start"))) - -  val hbs6 = new HBaseSource( -    "table_name", -    "quorum_name:2181", -    'key, -    List("column_family"), -    List('column_name), -    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") -    .read -    .write(Tsv(output.format("scan_range_between"))) - -} 
\ No newline at end of file +      val tableBytes = Bytes.toBytes(tableName) +      val connection = HConnectionManager.getConnection(conf) +      val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + +      conf.set("hbase.zookeeper.quorum", quorumNames) + +      val htable = new HTable(HBaseConfiguration.create(conf), tableName) + +      def makeN(n: Int) { +         (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => { +            val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x))) +            put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x)) +         }) +      } + +   } + +   HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000) + +   val hbs2 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.GET_LIST, keyList = List("13914", "10687", "14897").map(x => "%015d".format(x.toLong)), useSalt = true) +      .read +      .write(Tsv(output.format("get_list"))) + +   val hbs3 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") +      .read +      .write(Tsv(output.format("scan_all"))) + +   val hbs4 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_RANGE, stopKey = "%015d".format("13914".toLong), useSalt = true) +      .read +      .write(Tsv(output.format("scan_range_to_end"))) + +   val hbs5 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), useSalt = true) +      .read +      .write(Tsv(output.format("scan_range_from_start"))) + +   val hbs6 = new HBaseSource( +      "_TEST.SALT.01", +      quorumNames, +      'key, +      List("data"), +      List('data), +      sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), stopKey = "%015d".format("16897".toLong), useSalt = true) +      .read +      .write(Tsv(output.format("scan_range_between"))) + +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala index 890d2be..920f17d 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -3,36 +3,39 @@ package parallelai.spyglass.hbase.example  import com.twitter.scalding.Tool  import org.joda.time.format.DateTimeFormat  import java.util.Formatter.DateTime +import parallelai.spyglass.base.JobRunner  object HBaseExampleRunner extends App { -  val appPath = System.getenv("BIGDATA_APPCONF_PATH")  -  assert  (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) -  println( "Application Path is [%s]".format(appPath) ) -   -  val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { -    case "hdfs" => "--hdfs" -    case _ => "--local" -  }} -   -  println(modeString) -   -  val jobLibPath = modeString match { -    case "--hdfs" => { -      val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")  -      assert  (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) -      println( "Job Library Path Path is [%s]".format(jobLibPath) ) -      jobLibPath -    } -    case _ => "" -  } -   -  val quorum = System.getenv("BIGDATA_QUORUM_NAMES") -  assert  (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"}) -  println( "Quorum is [%s]".format(quorum) ) - -  val output = "HBaseTest.%s.tsv" - -  Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, -    "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum )) -  +   val appPath = System.getenv("BIGDATA_APPCONF_PATH") +   assert(appPath != null, { "Environment Variable BIGDATA_APPCONF_PATH is undefined or Null" }) +   println("Application Path is [%s]".format(appPath)) + +   val modeString = if (args.length == 0) { "--hdfs" } else { +      args(0) match { +         case "hdfs" => "--hdfs" +         case _ => "--hdfs" +      } +   } + +   println(modeString) + +   val jobLibPath = modeString match { +      case "--hdfs" => { +         val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH") +         assert(jobLibPath != null, { "Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null" }) +         println("Job Library Path Path is [%s]".format(jobLibPath)) +         jobLibPath +      } +      case _ => "" +   } + +   val quorum = System.getenv("BIGDATA_QUORUM_NAMES") +   assert(quorum != null, { "Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null" }) +   println("Quorum is [%s]".format(quorum)) + +   val output = "HBaseTest.%s" + +   JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath, +      "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum)) +  }
\ No newline at end of file | 
