From 31d1a6a713d177990609767d508209ced19ca396 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 2 Nov 2021 18:14:59 -0700 Subject: fmt (black): fatcat_tools/ --- python/fatcat_tools/workers/elasticsearch.py | 171 +++++++++++++++++---------- 1 file changed, 110 insertions(+), 61 deletions(-) (limited to 'python/fatcat_tools/workers/elasticsearch.py') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index f411073d..0d75f964 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,4 +1,3 @@ - import json import sys @@ -26,12 +25,20 @@ class ElasticsearchReleaseWorker(FatcatWorker): Uses a consumer group to manage offset. """ - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - elasticsearch_release_index="fatcat_releases", - batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic) + def __init__( + self, + kafka_hosts, + consume_topic, + poll_interval=10.0, + offset=None, + elasticsearch_backend="http://localhost:9200", + elasticsearch_index="fatcat", + elasticsearch_release_index="fatcat_releases", + batch_size=200, + api_host="https://api.fatcat.wiki/v0", + query_stats=False, + ): + super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size self.poll_interval = poll_interval @@ -63,45 +70,53 @@ class ElasticsearchReleaseWorker(FatcatWorker): print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) - #print("Kafka consumer commit successful") + # print("Kafka consumer commit successful") pass def on_rebalance(consumer, partitions): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions), file=sys.stderr) + print( + "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), + file=sys.stderr, + ) consumer_conf = self.kafka_config.copy() - consumer_conf.update({ - 'group.id': self.consumer_group, - 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - # user code timeout; if no poll after this long, assume user code - # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 60000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', - }, - }) + consumer_conf.update( + { + "group.id": self.consumer_group, + "on_commit": fail_fast, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + "enable.auto.commit": True, + "enable.auto.offset.store": False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + "max.poll.interval.ms": 60000, + "default.topic.config": { + "auto.offset.reset": "latest", + }, + } + ) consumer = Consumer(consumer_conf) - consumer.subscribe([self.consume_topic], + consumer.subscribe( + [self.consume_topic], on_assign=on_rebalance, on_revoke=on_rebalance, ) while True: - batch = consumer.consume( - num_messages=self.batch_size, - timeout=self.poll_interval) + batch = consumer.consume(num_messages=self.batch_size, timeout=self.poll_interval) if not batch: if not consumer.assignment(): print("... no Kafka consumer partitions assigned yet", file=sys.stderr) - print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval), file=sys.stderr) + print( + "... nothing new from kafka, try again (interval: {}".format( + self.poll_interval + ), + file=sys.stderr, + ) continue print("... got {} kafka messages".format(len(batch)), file=sys.stderr) # first check errors on entire batch... @@ -111,19 +126,24 @@ class ElasticsearchReleaseWorker(FatcatWorker): # ... then process bulk_actions = [] for msg in batch: - json_str = msg.value().decode('utf-8') + json_str = msg.value().decode("utf-8") entity = entity_from_json(json_str, self.entity_type, api_client=ac) assert isinstance(entity, self.entity_type) if self.entity_type == ChangelogEntry: key = entity.index # might need to fetch from API - if not (entity.editgroup and entity.editgroup.editor): # pylint: disable=no-member # (TODO) + if not ( + entity.editgroup and entity.editgroup.editor + ): # pylint: disable=no-member # (TODO) entity = api.get_changelog_entry(entity.index) else: key = entity.ident # pylint: disable=no-member # (TODO) - if self.entity_type != ChangelogEntry and entity.state == 'wip': - print(f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", file=sys.stderr) + if self.entity_type != ChangelogEntry and entity.state == "wip": + print( + f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", + file=sys.stderr, + ) continue if self.entity_type == ContainerEntity and self.query_stats: @@ -138,9 +158,15 @@ class ElasticsearchReleaseWorker(FatcatWorker): doc_dict = self.transform_func(entity) # TODO: handle deletions from index - bulk_actions.append(json.dumps({ - "index": { "_id": key, }, - })) + bulk_actions.append( + json.dumps( + { + "index": { + "_id": key, + }, + } + ) + ) bulk_actions.append(json.dumps(doc_dict)) # if only WIP entities, then skip @@ -149,15 +175,22 @@ class ElasticsearchReleaseWorker(FatcatWorker): consumer.store_offsets(message=msg) continue - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr) + print( + "Upserting, eg, {} (of {} {} in elasticsearch)".format( + key, len(batch), self.entity_type.__name__ + ), + file=sys.stderr, + ) elasticsearch_endpoint = "{}/{}/_bulk".format( - self.elasticsearch_backend, - self.elasticsearch_index) - resp = requests.post(elasticsearch_endpoint, + self.elasticsearch_backend, self.elasticsearch_index + ) + resp = requests.post( + elasticsearch_endpoint, headers={"Content-Type": "application/x-ndjson"}, - data="\n".join(bulk_actions) + "\n") + data="\n".join(bulk_actions) + "\n", + ) resp.raise_for_status() - if resp.json()['errors']: + if resp.json()["errors"]: desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) print(desc, file=sys.stderr) print(resp.content, file=sys.stderr) @@ -169,20 +202,29 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): - - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - query_stats=False, elasticsearch_release_index="fatcat_release", - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic, - poll_interval=poll_interval, - offset=offset, - elasticsearch_backend=elasticsearch_backend, - elasticsearch_index=elasticsearch_index, - elasticsearch_release_index=elasticsearch_release_index, - query_stats=query_stats, - batch_size=batch_size) + def __init__( + self, + kafka_hosts, + consume_topic, + poll_interval=10.0, + offset=None, + query_stats=False, + elasticsearch_release_index="fatcat_release", + elasticsearch_backend="http://localhost:9200", + elasticsearch_index="fatcat", + batch_size=200, + ): + super().__init__( + kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + poll_interval=poll_interval, + offset=offset, + elasticsearch_backend=elasticsearch_backend, + elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, + batch_size=batch_size, + ) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" self.entity_type = ContainerEntity @@ -196,11 +238,18 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): Note: Very early versions of changelog entries did not contain details about the editor or extra fields. """ - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog", - batch_size=200): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic) + + def __init__( + self, + kafka_hosts, + consume_topic, + poll_interval=10.0, + offset=None, + elasticsearch_backend="http://localhost:9200", + elasticsearch_index="fatcat_changelog", + batch_size=200, + ): + super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size self.poll_interval = poll_interval -- cgit v1.2.3