summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/fatcat/changelog_workers.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/python/fatcat/changelog_workers.py b/python/fatcat/changelog_workers.py
index 5f8621cf..e341ea32 100644
--- a/python/fatcat/changelog_workers.py
+++ b/python/fatcat/changelog_workers.py
@@ -97,8 +97,9 @@ class FatcatEntityUpdatesWorker(FatcatWorker):
changelog_topic = self.kafka.topics[self.consume_topic]
release_topic = self.kafka.topics[self.release_topic]
- consumer = changelog_topic.get_simple_consumer(
+ consumer = changelog_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
+ managed=True,
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=False,
)
@@ -117,4 +118,5 @@ class FatcatEntityUpdatesWorker(FatcatWorker):
partition_key=ident.encode('utf-8'),
timestamp=None,
)
+ consumer.commit_offsets()