diff options
author | Bryan Newbold <bnewbold@archive.org> | 2018-11-21 17:36:26 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2018-11-21 17:36:26 -0800 |
commit | 3054e1ce5395c70d75c7750a77ba8a49648bc504 (patch) | |
tree | 02d5d9353890ee4261a0230beac684815322c572 | |
parent | 83cb9717d4790e7540308d179f59723633157d07 (diff) | |
download | sandcrawler-3054e1ce5395c70d75c7750a77ba8a49648bc504.tar.gz sandcrawler-3054e1ce5395c70d75c7750a77ba8a49648bc504.zip |
fixes to hbase worker
-rwxr-xr-x | python/kafka_grobid_hbase.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py index 5241920..e6a53a1 100755 --- a/python/kafka_grobid_hbase.py +++ b/python/kafka_grobid_hbase.py @@ -42,6 +42,9 @@ class KafkaGrobidHbaseWorker: self.consume_topic = consume_topic self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert') self.kafka_hosts = kafka_hosts or 'localhost:9092' + self.hbase_host = kwargs['hbase_host'] + self.hbase_table = kwargs['hbase_table'] + self.hb_table = None # connection initialized in run() def do_work(self, raw_line): """ @@ -123,6 +126,15 @@ class KafkaGrobidHbaseWorker: # way... print? log? # 3. repeat! + try: + host = self.hbase_host + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + self.hb_table = hb_conn.table(self.options.hbase_table) kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0") consume_topic = kafka.topics[self.consume_topic] @@ -170,7 +182,7 @@ def main(): args = parser.parse_args() if args.consume_topic is None: - args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env) + args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) worker = KafkaGrobidHbaseWorker(**args.__dict__) worker.run() |