diff options
Diffstat (limited to 'python/kafka_grobid_hbase.py')
-rwxr-xr-x | python/kafka_grobid_hbase.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py index a7f2a5e..7c5c4a2 100755 --- a/python/kafka_grobid_hbase.py +++ b/python/kafka_grobid_hbase.py @@ -43,7 +43,7 @@ class KafkaGrobidHbaseWorker: self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2') self.kafka_hosts = kafka_hosts or 'localhost:9092' self.hbase_host = kwargs['hbase_host'] - self.hbase_table = kwargs['hbase_table'] + self.hbase_table_name = kwargs['hbase_table'] self.hb_table = None # connection initialized in run() def convert_tei(self, info): @@ -127,34 +127,39 @@ class KafkaGrobidHbaseWorker: # way... print? log? # 3. repeat! + print("Starting grobid-hbase-worker...") try: host = self.hbase_host hb_conn = happybase.Connection(host=host, transport="framed", protocol="compact") except Exception: raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.hbase_table) + self.hb_table = hb_conn.table(self.hbase_table_name) + print("HBase inserting into {}".format(self.hbase_table_name)) - kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0") + kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0") consume_topic = kafka.topics[self.consume_topic] - print("starting up...") sequential_failures = 0 consumer = consume_topic.get_balanced_consumer( consumer_group=self.consumer_group, managed=True, auto_commit_enable=True, compacted_topic=True) + print("Kafka consuming {} in group {}".format( + self.consume_topic, + self.consumer_group)) + for msg in consumer: #print("got a line! ") grobid_output, status = self.do_work(msg.value.decode('utf-8')) if grobid_output: sequential_failures = 0 else: - print("failed to process: {}".format(status)) + sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status)) sequential_failures += 1 if sequential_failures > 20: - print("too many failures in a row, bailing out") + sys.stderr.write("too many failures in a row, bailing out\n") sys.exit(-1) |