aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-09-19 20:00:24 -0700
committerBryan Newbold <bnewbold@robocracy.org>2019-09-20 11:21:31 -0700
commit90b5cb354d7d73c920288394aa9fd8d58e752157 (patch)
tree204085442b967fde2b8ad7ad46f521f3e3b834eb /python/fatcat_tools/workers/changelog.py
parent80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (diff)
downloadfatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.tar.gz
fatcat-90b5cb354d7d73c920288394aa9fd8d58e752157.zip
review/fix all confluent-kafka produce code
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r--python/fatcat_tools/workers/changelog.py16
1 files changed, 12 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index c134bde2..8b1ba5e9 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -40,7 +40,14 @@ class ChangelogWorker(FatcatWorker):
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- producer = Producer(self.kafka_config)
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
while True:
latest = int(self.api.get_changelog(limit=1)[0].index)
@@ -58,7 +65,7 @@ class ChangelogWorker(FatcatWorker):
#NOTE timestamp could be timestamp=cle.timestamp (?)
)
self.offset = i
- producer.poll(0)
+ producer.flush()
print("Sleeping {} seconds...".format(self.poll_interval))
time.sleep(self.poll_interval)
@@ -118,7 +125,6 @@ class EntityUpdatesWorker(FatcatWorker):
consumer_conf.update({
'group.id': self.consumer_group,
'on_commit': fail_fast,
- 'delivery.report.only.error': True,
# messages don't have offset marked as stored until pushed to
# elastic, but we do auto-commit stored offsets to broker
'enable.auto.commit': True,
@@ -134,8 +140,9 @@ class EntityUpdatesWorker(FatcatWorker):
producer_conf = self.kafka_config.copy()
producer_conf.update({
+ 'delivery.report.only.error': True,
'default.topic.config': {
- 'request.required.acks': -1,
+ 'request.required.acks': -1, # all brokers must confirm
},
})
producer = Producer(producer_conf)
@@ -207,6 +214,7 @@ class EntityUpdatesWorker(FatcatWorker):
key=ident.encode('utf-8'),
on_delivery=fail_fast,
)
+ producer.flush()
# TODO: actually update works
consumer.store_offsets(message=msg)