aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/harvest_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py37
1 files changed, 33 insertions, 4 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index aa7a69f5..78830a1c 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -130,30 +130,56 @@ class HarvestState:
if err:
raise KafkaException(err)
print("Commiting status to Kafka: {}".format(kafka_topic))
- producer = Producer(kafka_config)
- producer.produce(kafka_topic, state_json, on_delivery=fail_fast)
+ producer_conf = kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
+ producer.produce(
+ kafka_topic,
+ state_json,
+ on_delivery=fail_fast)
producer.flush()
return state_json
def initialize_from_kafka(self, kafka_topic, kafka_config):
"""
kafka_topic should have type str
+
+ TODO: this method does not fail if client can't connect to host.
"""
if not kafka_topic:
return
print("Fetching state from kafka topic: {}".format(kafka_topic))
+ def fail_fast(err, msg):
+ if err:
+ raise KafkaException(err)
conf = kafka_config.copy()
conf.update({
+ 'group.id': 'dummy_init_group', # should never be commited
+ 'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'session.timeout.ms': 10000,
- 'group.id': kafka_topic + "-init",
})
consumer = Consumer(conf)
+
+ # this watermark fetch is mostly to ensure we are connected to broker and
+ # fail fast if not, but we also confirm that we read to end below.
+ hwm = consumer.get_watermark_offsets(
+ TopicPartition(kafka_topic, 0),
+ timeout=5.0,
+ cached=False)
+ if not hwm:
+ raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic))
+
consumer.assign([TopicPartition(kafka_topic, 0, 0)])
c = 0
while True:
- msg = consumer.poll(timeout=1.0)
+ msg = consumer.poll(timeout=2.0)
if not msg:
break
if msg.error():
@@ -162,4 +188,7 @@ class HarvestState:
self.update(msg.value().decode('utf-8'))
c += 1
consumer.close()
+
+ # verify that we got at least to HWM
+ assert c >= hwm[1]
print("... got {} state update messages, done".format(c))