aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-04-12 15:27:34 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-04-12 15:27:34 -0700
commit83ae1d85d093f1f37bba74283b1d689bb1f1c346 (patch)
tree0a737da7c1d0658bc4d57a4309419518239bd367 /python/fatcat_tools/workers/elasticsearch.py
parent389f391f42442c5df1f91d7cc3243e7c7d561909 (diff)
downloadfatcat-83ae1d85d093f1f37bba74283b1d689bb1f1c346.tar.gz
fatcat-83ae1d85d093f1f37bba74283b1d689bb1f1c346.zip
es worker: ensure kafka messages get cleared
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py2
1 files changed, 2 insertions, 0 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index d9edb276..b8735a37 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -141,6 +141,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
# if only WIP entities, then skip
if not bulk_actions:
+ for msg in batch:
+ consumer.store_offsets(message=msg)
continue
print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr)