aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/kafka_grobid_hbase.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
index 5241920..e6a53a1 100755
--- a/python/kafka_grobid_hbase.py
+++ b/python/kafka_grobid_hbase.py
@@ -42,6 +42,9 @@ class KafkaGrobidHbaseWorker:
self.consume_topic = consume_topic
self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert')
self.kafka_hosts = kafka_hosts or 'localhost:9092'
+ self.hbase_host = kwargs['hbase_host']
+ self.hbase_table = kwargs['hbase_table']
+ self.hb_table = None # connection initialized in run()
def do_work(self, raw_line):
"""
@@ -123,6 +126,15 @@ class KafkaGrobidHbaseWorker:
# way... print? log?
# 3. repeat!
+ try:
+ host = self.hbase_host
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
+ self.hb_table = hb_conn.table(self.options.hbase_table)
kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
consume_topic = kafka.topics[self.consume_topic]
@@ -170,7 +182,7 @@ def main():
args = parser.parse_args()
if args.consume_topic is None:
- args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env)
+ args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env)
worker = KafkaGrobidHbaseWorker(**args.__dict__)
worker.run()