From b61e2e020a9249ab04be7484f4159a9dc1ca4a83 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 12 Apr 2021 10:35:53 -0700 Subject: ES indexing: skip 'wip' entities with a warning --- python/fatcat_tools/workers/elasticsearch.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 13409eb5..3e4753fb 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,4 +1,5 @@ +import sys import json import requests @@ -48,15 +49,15 @@ class ElasticsearchReleaseWorker(FatcatWorker): def fail_fast(err, partitions): if err is not None: - print("Kafka consumer commit error: {}".format(err)) - print("Bailing out...") + print("Kafka consumer commit error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) for p in partitions: # check for partition-specific commit errors if p.error: - print("Kafka consumer commit error: {}".format(p.error)) - print("Bailing out...") + print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) #print("Kafka consumer commit successful") @@ -67,7 +68,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): if p.error: raise KafkaException(p.error) print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions)) + consumer, partitions), file=sys.stderr) consumer_conf = self.kafka_config.copy() consumer_conf.update({ @@ -96,10 +97,10 @@ class ElasticsearchReleaseWorker(FatcatWorker): timeout=self.poll_interval) if not batch: if not consumer.assignment(): - print("... no Kafka consumer partitions assigned yet") - print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval)) + print("... no Kafka consumer partitions assigned yet", file=sys.stderr) + print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval), file=sys.stderr) continue - print("... got {} kafka messages".format(len(batch))) + print("... got {} kafka messages".format(len(batch)), file=sys.stderr) # first check errors on entire batch... for msg in batch: if msg.error(): @@ -117,6 +118,10 @@ class ElasticsearchReleaseWorker(FatcatWorker): else: key = entity.ident + if entity.state == 'wip': + print(f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", file=sys.stderr) + continue + if self.entity_type == ContainerEntity and self.query_stats: stats = get_elastic_container_stats( entity.ident, @@ -134,7 +139,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): })) bulk_actions.append(json.dumps(doc_dict)) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__)) + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr) elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index) @@ -144,8 +149,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): resp.raise_for_status() if resp.json()['errors']: desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) - print(desc) - print(resp.content) + print(desc, file=sys.stderr) + print(resp.content, file=sys.stderr) raise Exception(desc) for msg in batch: # offsets are *committed* (to brokers) automatically, but need -- cgit v1.2.3