summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-04 18:52:48 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-04 18:52:48 -0800
commit53d91dbefeb598539b02d18fad33f79babe2bb94 (patch)
treedf60b647975b903223928a276cffb8a77ac78f71
parente5486378d8d7adf8974b1f1ebaf0400445ba8791 (diff)
downloadfatcat-53d91dbefeb598539b02d18fad33f79babe2bb94.tar.gz
fatcat-53d91dbefeb598539b02d18fad33f79babe2bb94.zip
switch entity update worker to use balanced/manager consumer
-rw-r--r--python/fatcat/changelog_workers.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/python/fatcat/changelog_workers.py b/python/fatcat/changelog_workers.py
index 5f8621cf..e341ea32 100644
--- a/python/fatcat/changelog_workers.py
+++ b/python/fatcat/changelog_workers.py
@@ -97,8 +97,9 @@ class FatcatEntityUpdatesWorker(FatcatWorker):
changelog_topic = self.kafka.topics[self.consume_topic]
release_topic = self.kafka.topics[self.release_topic]
- consumer = changelog_topic.get_simple_consumer(
+ consumer = changelog_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
+ managed=True,
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=False,
)
@@ -117,4 +118,5 @@ class FatcatEntityUpdatesWorker(FatcatWorker):
partition_key=ident.encode('utf-8'),
timestamp=None,
)
+ consumer.commit_offsets()