From 3054e1ce5395c70d75c7750a77ba8a49648bc504 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 21 Nov 2018 17:36:26 -0800 Subject: fixes to hbase worker --- python/kafka_grobid_hbase.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py index 5241920..e6a53a1 100755 --- a/python/kafka_grobid_hbase.py +++ b/python/kafka_grobid_hbase.py @@ -42,6 +42,9 @@ class KafkaGrobidHbaseWorker: self.consume_topic = consume_topic self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert') self.kafka_hosts = kafka_hosts or 'localhost:9092' + self.hbase_host = kwargs['hbase_host'] + self.hbase_table = kwargs['hbase_table'] + self.hb_table = None # connection initialized in run() def do_work(self, raw_line): """ @@ -123,6 +126,15 @@ class KafkaGrobidHbaseWorker: # way... print? log? # 3. repeat! + try: + host = self.hbase_host + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + self.hb_table = hb_conn.table(self.options.hbase_table) kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0") consume_topic = kafka.topics[self.consume_topic] @@ -170,7 +182,7 @@ def main(): args = parser.parse_args() if args.consume_topic is None: - args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env) + args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) worker = KafkaGrobidHbaseWorker(**args.__dict__) worker.run() -- cgit v1.2.3