aboutsummaryrefslogtreecommitdiffstats
path: root/README.md
blob: 7f65d56299aba3f827c49472eee4d40ea065a6c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
SpyGlass
========

Cascading and Scalding wrapper for HBase with advanced read features

Building
========

	$ mvn clean install -U
	
	Requires Maven 3.x.x


Example
=======

	package parallelai.spyglass.hbase.example
	
	import org.apache.hadoop.conf.Configuration
	import org.apache.hadoop.hbase.HBaseConfiguration
	import org.apache.hadoop.hbase.client.HConnectionManager
	import org.apache.hadoop.hbase.client.HTable
	import org.apache.hadoop.hbase.util.Bytes
	import org.apache.log4j.Level
	import org.apache.log4j.Logger
	
	import com.twitter.scalding._
	import com.twitter.scalding.Args
	
	import parallelai.spyglass.base.JobBase
	import parallelai.spyglass.hbase.HBaseSource
	import parallelai.spyglass.hbase.HBaseConstants.SourceMode
	
	class HBaseExample(args: Args) extends JobBase(args) {
	
	  val isDebug: Boolean = args("debug").toBoolean
	
	  if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG)
	
	  val output = args("output")
	
	  println(output)
	
	  val jobConf = getJobConf
	
	  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181"
	
	  case class HBaseTableStore(
	      conf: Configuration,
	      quorum: String,
	      tableName: String) {
	
	    val tableBytes = Bytes.toBytes(tableName)
	    val connection = HConnectionManager.getConnection(conf)
	    val maxThreads = conf.getInt("hbase.htable.threads.max", 1)
	
	    conf.set("hbase.zookeeper.quorum", quorumNames);
	
	    val htable = new HTable(HBaseConfiguration.create(conf), tableName)
	
	  }
	
	  val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet")
	
	  val hbs2 = new HBaseSource(
	    "table_name",
	    "quorum_name:2181",
	    'key,
	    Array("column_family"),
	    Array('column_name),
	    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
	    .read
	    .write(Tsv(output.format("get_list")))
	
	  val hbs3 = new HBaseSource(
	    "table_name",
	    "quorum_name:2181",
	    'key,
	    Array("column_family"),
	    Array('column_name),
	    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693")
	    .read
	    .write(Tsv(output.format("scan_all")))
	
	  val hbs4 = new HBaseSource(
	    "table_name",
	    "quorum_name:2181",
	    'key,
	    Array("column_family"),
	    Array('column_name),
	    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
	    .read
	    .write(Tsv(output.format("scan_range_to_end")))
	
	  val hbs5 = new HBaseSource(
	    "table_name",
	    "quorum_name:2181",
	    'key,
	    Array("column_family"),
	    Array('column_name),
	    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
	    .read
	    .write(Tsv(output.format("scan_range_from_start")))
	
	  val hbs6 = new HBaseSource(
	    "table_name",
	    "quorum_name:2181",
	    'key,
	    Array("column_family"),
	    Array('column_name),
	    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
	    .read
	    .write(Tsv(output.format("scan_range_between")))
	
	}