aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-04-06 21:48:40 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-04-06 21:58:54 -0700
commit9f110393b90d5b9e95a39b4f83d3e864434dd189 (patch)
tree9320c75d5c19148aba7cd3a0ced0fc200988e6ba
parentb0c5db8a2bd2e389f99df1b44120c18fa5bc3e52 (diff)
downloadfatcat-9f110393b90d5b9e95a39b4f83d3e864434dd189.tar.gz
fatcat-9f110393b90d5b9e95a39b4f83d3e864434dd189.zip
container ES index worker: support for querying status
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py37
-rwxr-xr-xpython/fatcat_worker.py5
2 files changed, 37 insertions, 5 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 4850bb0a..13409eb5 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,10 +1,16 @@
import json
+
import requests
+import elasticsearch
from confluent_kafka import Consumer, KafkaException
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
-from fatcat_tools import *
+from fatcat_tools import (public_api, entity_from_json,
+ release_to_elasticsearch, container_to_elasticsearch,
+ changelog_to_elasticsearch,
+)
+from fatcat_web.search import get_elastic_container_stats
from .worker_common import FatcatWorker
@@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- batch_size=200, api_host="https://api.fatcat.wiki/v0"):
+ elasticsearch_release_index="fatcat_releases",
+ batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
@@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
+ self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
self.transform_func = release_to_elasticsearch
self.api_host = api_host
+ self.query_stats = query_stats
def run(self):
ac = ApiClient()
api = public_api(self.api_host)
+ # only used by container indexing query_stats code path
+ es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
+
def fail_fast(err, partitions):
if err is not None:
print("Kafka consumer commit error: {}".format(err))
@@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker):
entity = api.get_changelog_entry(entity.index)
else:
key = entity.ident
+
+ if self.entity_type == ContainerEntity and self.query_stats:
+ stats = get_elastic_container_stats(
+ entity.ident,
+ es_client=es_client,
+ es_index=self.elasticsearch_release_index,
+ merge_shadows=True,
+ )
+ doc_dict = container_to_elasticsearch(entity, stats=stats)
+ else:
+ doc_dict = self.transform_func(entity)
+
# TODO: handle deletions from index
bulk_actions.append(json.dumps({
"index": { "_id": key, },
}))
- bulk_actions.append(json.dumps(
- self.transform_func(entity)))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type))
+ bulk_actions.append(json.dumps(doc_dict))
+
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__))
elasticsearch_endpoint = "{}/{}/_bulk".format(
self.elasticsearch_backend,
self.elasticsearch_index)
@@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ query_stats=False, elasticsearch_release_index="fatcat_release",
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
@@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
offset=offset,
elasticsearch_backend=elasticsearch_backend,
elasticsearch_index=elasticsearch_index,
+ elasticsearch_release_index=elasticsearch_release_index,
+ query_stats=query_stats,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
self.consumer_group = "elasticsearch-updates3"
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 5e3fb3e9..95f5024a 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -44,6 +44,8 @@ def run_elasticsearch_release(args):
def run_elasticsearch_container(args):
consume_topic = "fatcat-{}.container-updates".format(args.env)
worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic,
+ query_stats=args.query_stats,
+ elasticsearch_release_index="fatcat_release",
elasticsearch_backend=args.elasticsearch_backend,
elasticsearch_index=args.elasticsearch_index)
worker.run()
@@ -99,6 +101,9 @@ def main():
sub_elasticsearch_container.add_argument('--elasticsearch-index',
help="elasticsearch index to push into",
default="fatcat_container")
+ sub_elasticsearch_container.add_argument('--query-stats',
+ action='store_true',
+ help="whether to query release search index for container stats")
sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog',
help="consume changelog kafka feed, transform and push to search")