diff options
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index c134bde2..8b1ba5e9 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -40,7 +40,14 @@ class ChangelogWorker(FatcatWorker): # TODO: should it be sys.exit(-1)? raise KafkaException(err) - producer = Producer(self.kafka_config) + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) while True: latest = int(self.api.get_changelog(limit=1)[0].index) @@ -58,7 +65,7 @@ class ChangelogWorker(FatcatWorker): #NOTE timestamp could be timestamp=cle.timestamp (?) ) self.offset = i - producer.poll(0) + producer.flush() print("Sleeping {} seconds...".format(self.poll_interval)) time.sleep(self.poll_interval) @@ -118,7 +125,6 @@ class EntityUpdatesWorker(FatcatWorker): consumer_conf.update({ 'group.id': self.consumer_group, 'on_commit': fail_fast, - 'delivery.report.only.error': True, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.commit': True, @@ -134,8 +140,9 @@ class EntityUpdatesWorker(FatcatWorker): producer_conf = self.kafka_config.copy() producer_conf.update({ + 'delivery.report.only.error': True, 'default.topic.config': { - 'request.required.acks': -1, + 'request.required.acks': -1, # all brokers must confirm }, }) producer = Producer(producer_conf) @@ -207,6 +214,7 @@ class EntityUpdatesWorker(FatcatWorker): key=ident.encode('utf-8'), on_delivery=fail_fast, ) + producer.flush() # TODO: actually update works consumer.store_offsets(message=msg) |