diff options
-rwxr-xr-x | python/kafka_grobid_hbase.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py index 5241920..e6a53a1 100755 --- a/python/kafka_grobid_hbase.py +++ b/python/kafka_grobid_hbase.py @@ -42,6 +42,9 @@ class KafkaGrobidHbaseWorker: self.consume_topic = consume_topic self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert') self.kafka_hosts = kafka_hosts or 'localhost:9092' + self.hbase_host = kwargs['hbase_host'] + self.hbase_table = kwargs['hbase_table'] + self.hb_table = None # connection initialized in run() def do_work(self, raw_line): """ @@ -123,6 +126,15 @@ class KafkaGrobidHbaseWorker: # way... print? log? # 3. repeat! + try: + host = self.hbase_host + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + self.hb_table = hb_conn.table(self.options.hbase_table) kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0") consume_topic = kafka.topics[self.consume_topic] @@ -170,7 +182,7 @@ def main(): args = parser.parse_args() if args.consume_topic is None: - args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env) + args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) worker = KafkaGrobidHbaseWorker(**args.__dict__) worker.run() |