diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-19 20:00:24 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-20 11:21:31 -0700 |
commit | 90b5cb354d7d73c920288394aa9fd8d58e752157 (patch) | |
tree | 204085442b967fde2b8ad7ad46f521f3e3b834eb /python/fatcat_tools/harvest/oaipmh.py | |
parent | 80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (diff) | |
download | fatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.tar.gz fatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.zip |
review/fix all confluent-kafka produce code
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 3e3bea03..f908ba83 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -39,10 +39,7 @@ class HarvestOaiPmhWorker: self.state_topic = state_topic self.kafka_config = { 'bootstrap.servers': kafka_hosts, - 'delivery.report.only.error': True, 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes - 'default.topic.config': - {'request.required.acks': 'all'}, } self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks @@ -62,7 +59,14 @@ class HarvestOaiPmhWorker: # TODO: should it be sys.exit(-1)? raise KafkaException(err) - producer = Producer(self.kafka_config) + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) api = sickle.Sickle(self.endpoint_url) date_str = date.isoformat() @@ -88,7 +92,6 @@ class HarvestOaiPmhWorker: item.raw.encode('utf-8'), key=item.header.identifier.encode('utf-8'), on_delivery=fail_fast) - producer.poll(0) producer.flush() def run(self, continuous=False): |