diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-04-12 15:27:34 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-04-12 15:27:34 -0700 |
commit | 83ae1d85d093f1f37bba74283b1d689bb1f1c346 (patch) | |
tree | 0a737da7c1d0658bc4d57a4309419518239bd367 | |
parent | 389f391f42442c5df1f91d7cc3243e7c7d561909 (diff) | |
download | fatcat-83ae1d85d093f1f37bba74283b1d689bb1f1c346.tar.gz fatcat-83ae1d85d093f1f37bba74283b1d689bb1f1c346.zip |
es worker: ensure kafka messages get cleared
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 2 |
1 files changed, 2 insertions, 0 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index d9edb276..b8735a37 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -141,6 +141,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): # if only WIP entities, then skip if not bulk_actions: + for msg in batch: + consumer.store_offsets(message=msg) continue print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr) |