From a4b84445a96c0fbe6133331b02f96cf570c59149 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 30 May 2019 11:59:27 -0700 Subject: add container update elastic worker --- python/fatcat_tools/workers/__init__.py | 2 +- python/fatcat_tools/workers/elasticsearch.py | 30 +++++++++++++++++++++++----- python/fatcat_worker.py | 16 +++++++++++++++ 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py index e8973bc3..8bea7cdc 100644 --- a/python/fatcat_tools/workers/__init__.py +++ b/python/fatcat_tools/workers/__init__.py @@ -1,4 +1,4 @@ from .changelog import ChangelogWorker, EntityUpdatesWorker -from .elasticsearch import ElasticsearchReleaseWorker +from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker from .worker_common import most_recent_message, FatcatWorker diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 83310284..6e336e52 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -24,6 +24,9 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.consumer_group = "elasticsearch-updates" self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index + self.entity_type = ReleaseEntity + self.elasticsearch_document_name = "release" + self.transform_func = release_to_elasticsearch def run(self): consume_topic = self.kafka.topics[self.consume_topic] @@ -40,13 +43,30 @@ class ElasticsearchReleaseWorker(FatcatWorker): for msg in consumer: json_str = msg.value.decode('utf-8') - release = entity_from_json(json_str, ReleaseEntity, api_client=ac) - #print(release) - elasticsearch_endpoint = "{}/{}/release/{}".format( + entity = entity_from_json(json_str, self.entity_type, api_client=ac) + #print(entity) + elasticsearch_endpoint = "{}/{}/{}/{}".format( self.elasticsearch_backend, self.elasticsearch_index, - release.ident) + self.elasticsearch_document_name, + entity.ident) print("Updating document: {}".format(elasticsearch_endpoint)) - resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release)) + resp = requests.post(elasticsearch_endpoint, json=self.transform_func(entity)) resp.raise_for_status() #consumer.commit_offsets() + + +class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + poll_interval, + offset, + elasticsearch_backend, + elasticsearch_index) + self.entity_type = ContainerEntity + self.elasticsearch_document_name = "container" + self.transform_func = container_to_elasticsearch + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 267779ff..4455d29e 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -38,6 +38,13 @@ def run_elasticsearch_release(args): elasticsearch_index=args.elasticsearch_index) worker.run() +def run_elasticsearch_container(args): + consume_topic = "fatcat-{}.container-updates".format(args.env) + worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic, + elasticsearch_backend=args.elasticsearch_backend, + elasticsearch_index=args.elasticsearch_index) + worker.run() + def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', @@ -72,6 +79,15 @@ def main(): help="elasticsearch index to push into", default="fatcat_release_v03") + sub_elasticsearch_container = subparsers.add_parser('elasticsearch-container') + sub_elasticsearch_container.set_defaults(func=run_elasticsearch_container) + sub_elasticsearch_container.add_argument('--elasticsearch-backend', + help="elasticsearch backend to connect to", + default="http://localhost:9200") + sub_elasticsearch_container.add_argument('--elasticsearch-index', + help="elasticsearch index to push into", + default="fatcat_container") + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") -- cgit v1.2.3