aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/oaipmh.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-09-19 20:00:24 -0700
committerBryan Newbold <bnewbold@robocracy.org>2019-09-20 11:21:31 -0700
commit90b5cb354d7d73c920288394aa9fd8d58e752157 (patch)
tree204085442b967fde2b8ad7ad46f521f3e3b834eb /python/fatcat_tools/harvest/oaipmh.py
parent80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (diff)
downloadfatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.tar.gz
fatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.zip
review/fix all confluent-kafka produce code
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py13
1 files changed, 8 insertions, 5 deletions
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 3e3bea03..f908ba83 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -39,10 +39,7 @@ class HarvestOaiPmhWorker:
self.state_topic = state_topic
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'delivery.report.only.error': True,
'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
- 'default.topic.config':
- {'request.required.acks': 'all'},
}
self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
@@ -62,7 +59,14 @@ class HarvestOaiPmhWorker:
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- producer = Producer(self.kafka_config)
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
api = sickle.Sickle(self.endpoint_url)
date_str = date.isoformat()
@@ -88,7 +92,6 @@ class HarvestOaiPmhWorker:
item.raw.encode('utf-8'),
key=item.header.identifier.encode('utf-8'),
on_delivery=fail_fast)
- producer.poll(0)
producer.flush()
def run(self, continuous=False):