aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/doi_registrars.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py13
1 files changed, 8 insertions, 5 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index 3362df35..7e791745 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -56,11 +56,7 @@ class HarvestCrossrefWorker:
self.is_update_filter = is_update_filter
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'delivery.report.only.error': True,
'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
- 'default.topic.config': {
- 'request.required.acks': 'all',
- },
}
self.state = HarvestState(start_date, end_date)
@@ -97,7 +93,14 @@ class HarvestCrossrefWorker:
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- producer = Producer(self.kafka_config)
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
date_str = date.isoformat()
params = self.params(date_str)