diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-04 18:52:48 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-04 18:52:48 -0800 |
commit | 53d91dbefeb598539b02d18fad33f79babe2bb94 (patch) | |
tree | df60b647975b903223928a276cffb8a77ac78f71 | |
parent | e5486378d8d7adf8974b1f1ebaf0400445ba8791 (diff) | |
download | fatcat-53d91dbefeb598539b02d18fad33f79babe2bb94.tar.gz fatcat-53d91dbefeb598539b02d18fad33f79babe2bb94.zip |
switch entity update worker to use balanced/manager consumer
-rw-r--r-- | python/fatcat/changelog_workers.py | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/python/fatcat/changelog_workers.py b/python/fatcat/changelog_workers.py index 5f8621cf..e341ea32 100644 --- a/python/fatcat/changelog_workers.py +++ b/python/fatcat/changelog_workers.py @@ -97,8 +97,9 @@ class FatcatEntityUpdatesWorker(FatcatWorker): changelog_topic = self.kafka.topics[self.consume_topic] release_topic = self.kafka.topics[self.release_topic] - consumer = changelog_topic.get_simple_consumer( + consumer = changelog_topic.get_balanced_consumer( consumer_group=self.consumer_group, + managed=True, auto_offset_reset=OffsetType.LATEST, reset_offset_on_start=False, ) @@ -117,4 +118,5 @@ class FatcatEntityUpdatesWorker(FatcatWorker): partition_key=ident.encode('utf-8'), timestamp=None, ) + consumer.commit_offsets() |