aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r--python/fatcat_tools/workers/changelog.py14
1 files changed, 10 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 4a54c649..c134bde2 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -13,7 +13,6 @@ class ChangelogWorker(FatcatWorker):
"""
def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
- # TODO: should be offset=0
super().__init__(kafka_hosts=kafka_hosts,
produce_topic=produce_topic,
api=api)
@@ -118,7 +117,15 @@ class EntityUpdatesWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 180000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -142,8 +149,7 @@ class EntityUpdatesWorker(FatcatWorker):
while True:
msg = consumer.poll(self.poll_interval)
if not msg:
- print("nothing new from kafka (interval:{})".format(self.poll_interval))
- consumer.commit()
+ print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval))
continue
if msg.error():
raise KafkaException(msg.error())
@@ -202,5 +208,5 @@ class EntityUpdatesWorker(FatcatWorker):
on_delivery=fail_fast,
)
# TODO: actually update works
- consumer.store_offsets(msg)
+ consumer.store_offsets(message=msg)