diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass/hbase')
3 files changed, 215 insertions, 0 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala new file mode 100644 index 0000000..e46ef50 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -0,0 +1,82 @@ +package parallelai.spyglass.hbase + +import java.io.IOException +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.AccessMode +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Mode +import com.twitter.scalding.Read +import com.twitter.scalding.Source +import com.twitter.scalding.Write +import cascading.scheme.Scheme +import cascading.tap.SinkMode +import cascading.tap.Tap +import cascading.tuple.Fields +import org.apache.hadoop.mapred.RecordReader +import scala.compat.Platform +import org.apache.hadoop.mapred.OutputCollector +import org.apache.hadoop.mapred.JobConf +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +object Conversions { +  implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) +  implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong +  implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get()) +  implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s)) +} + +class HBaseSource( +    tableName: String = null, +    quorumNames: String = "localhost", +    keyFields: Fields = null, +    familyNames: Array[String] = null, +    valueFields: Array[Fields] = null, +    timestamp: Long = Platform.currentTime, +    sourceMode: SourceMode = SourceMode.SCAN_ALL, +    startKey: String = null, +    stopKey: String = null, +    keyList: List[String] = null +  ) extends Source { +     +  override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) +    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +    val hBaseScheme = hdfsScheme match { +      case hbase: HBaseScheme => hbase +      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme") +    }  +    mode match {  +      case hdfsMode @ Hdfs(_, _) => readOrWrite match { +        case Read => {  +          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) +           +          sourceMode match { +            case SourceMode.SCAN_RANGE => { +              hbt.setHBaseRangeParms(startKey, stopKey) +            } +            case SourceMode.SCAN_ALL => { +              hbt.setHBaseScanAllParms() +            } +            case SourceMode.GET_LIST => { +              if( keyList == null )  +                throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) +               +              hbt.setHBaseListParms(keyList.toArray[String]) +            } +            case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) +          } +           +          hbt.asInstanceOf[Tap[_,_,_]] +        } +        case Write => { +          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) +           +          hbt.asInstanceOf[Tap[_,_,_]] +        } +      } +      case _ => super.createTap(readOrWrite)(mode) +    } +  } +} diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala new file mode 100644 index 0000000..4c86b07 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase.example + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.util.Bytes +import org.apache.log4j.Level +import org.apache.log4j.Logger + +import com.twitter.scalding._ +import com.twitter.scalding.Args + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class HBaseExample(args: Args) extends JobBase(args) { + +  val isDebug: Boolean = args("debug").toBoolean + +  if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) + +  val output = args("output") + +  println(output) + +  val jobConf = getJobConf + +  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" + +  case class HBaseTableStore( +      conf: Configuration, +      quorum: String, +      tableName: String) { + +    val tableBytes = Bytes.toBytes(tableName) +    val connection = HConnectionManager.getConnection(conf) +    val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + +    conf.set("hbase.zookeeper.quorum", quorumNames); + +    val htable = new HTable(HBaseConfiguration.create(conf), tableName) + +  } + +  val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") + +  val hbs2 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) +    .read +    .write(Tsv(output.format("get_list"))) + +  val hbs3 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") +    .read +    .write(Tsv(output.format("scan_all"))) + +  val hbs4 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_to_end"))) + +  val hbs5 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_from_start"))) + +  val hbs6 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") +    .read +    .write(Tsv(output.format("scan_range_between"))) + +} 
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala new file mode 100644 index 0000000..d6b762e --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -0,0 +1,34 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.Tool +import org.joda.time.format.DateTimeFormat +import java.util.Formatter.DateTime + +object HBaseExampleRunner extends App { +  val appPath = System.getenv("BIGDATA_APPCONF_PATH")  +  assert  (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) +  println( "Application Path is [%s]".format(appPath) ) +   +  val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { +    case "hdfs" => "--hdfs" +    case _ => "--local" +  }} +   +  println(modeString) +   +  val jobLibPath = modeString match { +    case "--hdfs" => { +      val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")  +      assert  (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) +      println( "Job Library Path Path is [%s]".format(jobLibPath) ) +      jobLibPath +    } +    case _ => "" +  } + +  val output = "HBaseTest.%s.tsv" + +  Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, +    "--output", output, "--debug", "true", "--job.lib.path", jobLibPath )) +  +}
\ No newline at end of file  | 
