diff options
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 37 |
1 files changed, 32 insertions, 5 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 4850bb0a..13409eb5 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,10 +1,16 @@ import json + import requests +import elasticsearch from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry -from fatcat_tools import * +from fatcat_tools import (public_api, entity_from_json, + release_to_elasticsearch, container_to_elasticsearch, + changelog_to_elasticsearch, +) +from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200, api_host="https://api.fatcat.wiki/v0"): + elasticsearch_release_index="fatcat_releases", + batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" @@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index + self.elasticsearch_release_index = elasticsearch_release_index self.entity_type = ReleaseEntity self.transform_func = release_to_elasticsearch self.api_host = api_host + self.query_stats = query_stats def run(self): ac = ApiClient() api = public_api(self.api_host) + # only used by container indexing query_stats code path + es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) + def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) @@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker): entity = api.get_changelog_entry(entity.index) else: key = entity.ident + + if self.entity_type == ContainerEntity and self.query_stats: + stats = get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=self.elasticsearch_release_index, + merge_shadows=True, + ) + doc_dict = container_to_elasticsearch(entity, stats=stats) + else: + doc_dict = self.transform_func(entity) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": key, }, })) - bulk_actions.append(json.dumps( - self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) + bulk_actions.append(json.dumps(doc_dict)) + + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__)) elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index) @@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + query_stats=False, elasticsearch_release_index="fatcat_release", elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", batch_size=200): super().__init__(kafka_hosts=kafka_hosts, @@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): offset=offset, elasticsearch_backend=elasticsearch_backend, elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, batch_size=batch_size) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" |