diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-19 20:00:24 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-20 11:21:31 -0700 |
commit | 90b5cb354d7d73c920288394aa9fd8d58e752157 (patch) | |
tree | 204085442b967fde2b8ad7ad46f521f3e3b834eb /python/fatcat_tools/harvest/harvest_common.py | |
parent | 80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (diff) | |
download | fatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.tar.gz fatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.zip |
review/fix all confluent-kafka produce code
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 37 |
1 files changed, 33 insertions, 4 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index aa7a69f5..78830a1c 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -130,30 +130,56 @@ class HarvestState: if err: raise KafkaException(err) print("Commiting status to Kafka: {}".format(kafka_topic)) - producer = Producer(kafka_config) - producer.produce(kafka_topic, state_json, on_delivery=fail_fast) + producer_conf = kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) + producer.produce( + kafka_topic, + state_json, + on_delivery=fail_fast) producer.flush() return state_json def initialize_from_kafka(self, kafka_topic, kafka_config): """ kafka_topic should have type str + + TODO: this method does not fail if client can't connect to host. """ if not kafka_topic: return print("Fetching state from kafka topic: {}".format(kafka_topic)) + def fail_fast(err, msg): + if err: + raise KafkaException(err) conf = kafka_config.copy() conf.update({ + 'group.id': 'dummy_init_group', # should never be commited + 'enable.auto.commit': False, 'auto.offset.reset': 'earliest', 'session.timeout.ms': 10000, - 'group.id': kafka_topic + "-init", }) consumer = Consumer(conf) + + # this watermark fetch is mostly to ensure we are connected to broker and + # fail fast if not, but we also confirm that we read to end below. + hwm = consumer.get_watermark_offsets( + TopicPartition(kafka_topic, 0), + timeout=5.0, + cached=False) + if not hwm: + raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)) + consumer.assign([TopicPartition(kafka_topic, 0, 0)]) c = 0 while True: - msg = consumer.poll(timeout=1.0) + msg = consumer.poll(timeout=2.0) if not msg: break if msg.error(): @@ -162,4 +188,7 @@ class HarvestState: self.update(msg.value().decode('utf-8')) c += 1 consumer.close() + + # verify that we got at least to HWM + assert c >= hwm[1] print("... got {} state update messages, done".format(c)) |