diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 37 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 5 | 
2 files changed, 37 insertions, 5 deletions
| diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 4850bb0a..13409eb5 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,10 +1,16 @@  import json +  import requests +import elasticsearch  from confluent_kafka import Consumer, KafkaException  from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry -from fatcat_tools import * +from fatcat_tools import (public_api, entity_from_json, +    release_to_elasticsearch, container_to_elasticsearch, +    changelog_to_elasticsearch, +) +from fatcat_web.search import get_elastic_container_stats  from .worker_common import FatcatWorker @@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):      def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,              elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", -            batch_size=200, api_host="https://api.fatcat.wiki/v0"): +            elasticsearch_release_index="fatcat_releases", +            batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic)          self.consumer_group = "elasticsearch-updates3" @@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker):          self.poll_interval = poll_interval          self.elasticsearch_backend = elasticsearch_backend          self.elasticsearch_index = elasticsearch_index +        self.elasticsearch_release_index = elasticsearch_release_index          self.entity_type = ReleaseEntity          self.transform_func = release_to_elasticsearch          self.api_host = api_host +        self.query_stats = query_stats      def run(self):          ac = ApiClient()          api = public_api(self.api_host) +        # only used by container indexing query_stats code path +        es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) +          def fail_fast(err, partitions):              if err is not None:                  print("Kafka consumer commit error: {}".format(err)) @@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker):                          entity = api.get_changelog_entry(entity.index)                  else:                      key = entity.ident + +                if self.entity_type == ContainerEntity and self.query_stats: +                    stats = get_elastic_container_stats( +                        entity.ident, +                        es_client=es_client, +                        es_index=self.elasticsearch_release_index, +                        merge_shadows=True, +                    ) +                    doc_dict = container_to_elasticsearch(entity, stats=stats) +                else: +                    doc_dict = self.transform_func(entity) +                  # TODO: handle deletions from index                  bulk_actions.append(json.dumps({                      "index": { "_id": key, },                  })) -                bulk_actions.append(json.dumps( -                    self.transform_func(entity))) -            print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) +                bulk_actions.append(json.dumps(doc_dict)) + +            print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__))              elasticsearch_endpoint = "{}/{}/_bulk".format(                  self.elasticsearch_backend,                  self.elasticsearch_index) @@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):  class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):      def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, +            query_stats=False, elasticsearch_release_index="fatcat_release",              elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",              batch_size=200):          super().__init__(kafka_hosts=kafka_hosts, @@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):                           offset=offset,                           elasticsearch_backend=elasticsearch_backend,                           elasticsearch_index=elasticsearch_index, +                         elasticsearch_release_index=elasticsearch_release_index, +                         query_stats=query_stats,                           batch_size=batch_size)          # previous group got corrupted (by pykafka library?)          self.consumer_group = "elasticsearch-updates3" diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 5e3fb3e9..95f5024a 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -44,6 +44,8 @@ def run_elasticsearch_release(args):  def run_elasticsearch_container(args):      consume_topic = "fatcat-{}.container-updates".format(args.env)      worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic, +        query_stats=args.query_stats, +        elasticsearch_release_index="fatcat_release",          elasticsearch_backend=args.elasticsearch_backend,          elasticsearch_index=args.elasticsearch_index)      worker.run() @@ -99,6 +101,9 @@ def main():      sub_elasticsearch_container.add_argument('--elasticsearch-index',          help="elasticsearch index to push into",          default="fatcat_container") +    sub_elasticsearch_container.add_argument('--query-stats', +        action='store_true', +        help="whether to query release search index for container stats")      sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog',          help="consume changelog kafka feed, transform and push to search") | 
