aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala')
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala28
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala152
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala63
3 files changed, 126 insertions, 117 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
index b6d5742..31ed3ea 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -10,15 +10,14 @@ import org.apache.hadoop.hbase.util.Bytes
import cascading.tuple.TupleEntry
class HBasePipeWrapper (pipe: Pipe) {
- def toBytesWritable(f: Fields): Pipe = {
+ def toBytesWritable(f: Fields): Pipe = {
asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String => {
- new ImmutableBytesWritable(Bytes.toBytes(
- if (from == null) "" else from))
- }}
- }}
- }
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
+ }
// def toBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
@@ -30,13 +29,12 @@ class HBasePipeWrapper (pipe: Pipe) {
def fromBytesWritable(f: Fields): Pipe = {
asList(f)
- .foldLeft(pipe) { (p, fld) =>
- p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
- Bytes.toString(from.get)
- }
- }
- }
- }
+ .foldLeft(pipe) { (p, fld) => {
+ p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable =>
+ Option(from).map(x => Bytes.toString(x.get)).getOrElse(null)
+ }
+ }}
+ }
// def fromBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
index eccd653..2aa5342 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala
@@ -7,93 +7,101 @@ import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.log4j.Logger
-
import com.twitter.scalding._
import com.twitter.scalding.Args
-
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import org.apache.hadoop.hbase.client.Put
+import parallelai.spyglass.hbase.HBaseSalter
class HBaseExample(args: Args) extends JobBase(args) {
- val isDebug: Boolean = args("debug").toBoolean
+ val isDebug: Boolean = args("debug").toBoolean
- if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
+ if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
- val output = args("output")
+ val output = args("output")
- println(output)
+ val jobConf = getJobConf()
- val jobConf = getJobConf()
+ val quorumNames = args("quorum")
- val quorumNames = args("quorum")
+ println("Output : " + output)
+ println("Quorum : " + quorumNames)
- case class HBaseTableStore(
+ case class HBaseTableStore(
conf: Configuration,
quorum: String,
tableName: String) {
- val tableBytes = Bytes.toBytes(tableName)
- val connection = HConnectionManager.getConnection(conf)
- val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
-
- conf.set("hbase.zookeeper.quorum", quorumNames)
-
- val htable = new HTable(HBaseConfiguration.create(conf), tableName)
-
- }
-
- val hTableStore = HBaseTableStore(getJobConf(), quorumNames, "skybet.test.tbet")
-
- val hbs2 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
- .read
- .write(Tsv(output.format("get_list")))
-
- val hbs3 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
- .read
- .write(Tsv(output.format("scan_all")))
-
- val hbs4 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_to_end")))
-
- val hbs5 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_from_start")))
-
- val hbs6 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- List("column_family"),
- List('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
- .read
- .write(Tsv(output.format("scan_range_between")))
-
-} \ No newline at end of file
+ val tableBytes = Bytes.toBytes(tableName)
+ val connection = HConnectionManager.getConnection(conf)
+ val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
+
+ conf.set("hbase.zookeeper.quorum", quorumNames)
+
+ val htable = new HTable(HBaseConfiguration.create(conf), tableName)
+
+ def makeN(n: Int) {
+ (0 to n - 1).map(x => "%015d".format(x.toLong)).foreach(x => {
+ val put = new Put(HBaseSalter.addSaltPrefix(Bytes.toBytes(x)))
+ put.add(Bytes.toBytes("data"), Bytes.toBytes("data"), Bytes.toBytes(x))
+ })
+ }
+
+ }
+
+ HBaseTableStore(jobConf, quorumNames, "_TEST.SALT.01").makeN(100000)
+
+ val hbs2 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.GET_LIST, keyList = List("13914", "10687", "14897").map(x => "%015d".format(x.toLong)), useSalt = true)
+ .read
+ .write(Tsv(output.format("get_list")))
+
+ val hbs3 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
+ .read
+ .write(Tsv(output.format("scan_all")))
+
+ val hbs4 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_RANGE, stopKey = "%015d".format("13914".toLong), useSalt = true)
+ .read
+ .write(Tsv(output.format("scan_range_to_end")))
+
+ val hbs5 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), useSalt = true)
+ .read
+ .write(Tsv(output.format("scan_range_from_start")))
+
+ val hbs6 = new HBaseSource(
+ "_TEST.SALT.01",
+ quorumNames,
+ 'key,
+ List("data"),
+ List('data),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "%015d".format("13914".toLong), stopKey = "%015d".format("16897".toLong), useSalt = true)
+ .read
+ .write(Tsv(output.format("scan_range_between")))
+
+} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
index 890d2be..920f17d 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
@@ -3,36 +3,39 @@ package parallelai.spyglass.hbase.example
import com.twitter.scalding.Tool
import org.joda.time.format.DateTimeFormat
import java.util.Formatter.DateTime
+import parallelai.spyglass.base.JobRunner
object HBaseExampleRunner extends App {
- val appPath = System.getenv("BIGDATA_APPCONF_PATH")
- assert (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"})
- println( "Application Path is [%s]".format(appPath) )
-
- val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match {
- case "hdfs" => "--hdfs"
- case _ => "--local"
- }}
-
- println(modeString)
-
- val jobLibPath = modeString match {
- case "--hdfs" => {
- val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")
- assert (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"})
- println( "Job Library Path Path is [%s]".format(jobLibPath) )
- jobLibPath
- }
- case _ => ""
- }
-
- val quorum = System.getenv("BIGDATA_QUORUM_NAMES")
- assert (quorum != null, {"Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null"})
- println( "Quorum is [%s]".format(quorum) )
-
- val output = "HBaseTest.%s.tsv"
-
- Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath,
- "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum ))
-
+ val appPath = System.getenv("BIGDATA_APPCONF_PATH")
+ assert(appPath != null, { "Environment Variable BIGDATA_APPCONF_PATH is undefined or Null" })
+ println("Application Path is [%s]".format(appPath))
+
+ val modeString = if (args.length == 0) { "--hdfs" } else {
+ args(0) match {
+ case "hdfs" => "--hdfs"
+ case _ => "--hdfs"
+ }
+ }
+
+ println(modeString)
+
+ val jobLibPath = modeString match {
+ case "--hdfs" => {
+ val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")
+ assert(jobLibPath != null, { "Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null" })
+ println("Job Library Path Path is [%s]".format(jobLibPath))
+ jobLibPath
+ }
+ case _ => ""
+ }
+
+ val quorum = System.getenv("BIGDATA_QUORUM_NAMES")
+ assert(quorum != null, { "Environment Variable BIGDATA_QUORUM_NAMES is undefined or Null" })
+ println("Quorum is [%s]".format(quorum))
+
+ val output = "HBaseTest.%s"
+
+ JobRunner.main(Array(classOf[HBaseExample].getName, "--hdfs", "--app.conf.path", appPath,
+ "--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum))
+
} \ No newline at end of file