From 89db8df9eef40b92454ed9bd64830ebe5b726b9a Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Fri, 17 Apr 2020 14:30:57 +0200 Subject: derive changelog worker from release worker Early versions of changelog entries may not have all the fields required for the current transform. --- python/fatcat_tools/workers/__init__.py | 2 +- python/fatcat_tools/workers/elasticsearch.py | 23 ++++++++++++++++++++++- python/fatcat_worker.py | 19 ++++++++++++++++++- 3 files changed, 41 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py index 8bea7cdc..32fd330d 100644 --- a/python/fatcat_tools/workers/__init__.py +++ b/python/fatcat_tools/workers/__init__.py @@ -1,4 +1,4 @@ from .changelog import ChangelogWorker, EntityUpdatesWorker -from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker +from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker from .worker_common import most_recent_message, FatcatWorker diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 68d6c304..525f372b 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -4,7 +4,7 @@ import time import requests from confluent_kafka import Consumer, KafkaException -from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient +from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry from fatcat_tools import * from .worker_common import FatcatWorker @@ -148,3 +148,24 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch + +class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): + """ + Pulls changelog messages from Kafka, runs transformations and indexes them. + + Note: Very early versions of changelog entries did not contain details + about the editor or extra fields. + """ + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog", + batch_size=200): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic) + self.consumer_group = "elasticsearch-updates3" + self.batch_size = batch_size + self.poll_interval = poll_interval + self.elasticsearch_backend = elasticsearch_backend + self.elasticsearch_index = elasticsearch_index + self.entity_type = ChangelogEntry + self.elasticsearch_document_name = "changelog" + self.transform_func = changelog_to_elasticsearch diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index bfb87a72..03167a3a 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -6,7 +6,7 @@ import datetime import raven from fatcat_tools import public_api -from fatcat_tools.workers import ChangelogWorker, EntityUpdatesWorker, ElasticsearchReleaseWorker, ElasticsearchContainerWorker +from fatcat_tools.workers import ChangelogWorker, EntityUpdatesWorker, ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable sentry_client = raven.Client() @@ -47,6 +47,13 @@ def run_elasticsearch_container(args): elasticsearch_index=args.elasticsearch_index) worker.run() +def run_elasticsearch_changelog(args): + consume_topic = "fatcat-{}.changelog".format(args.env) + worker = ElasticsearchChangelogWorker(args.kafka_hosts, consume_topic, + elasticsearch_backend=args.elasticsearch_backend, + elasticsearch_index=args.elasticsearch_index) + worker.run() + def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -92,6 +99,16 @@ def main(): help="elasticsearch index to push into", default="fatcat_container") + sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog', + help="consume changelog kafka feed, transform and push to search") + sub_elasticsearch_changelog.set_defaults(func=run_elasticsearch_changelog) + sub_elasticsearch_changelog.add_argument('--elasticsearch-backend', + help="elasticsearch backend to connect to", + default="http://localhost:9200") + sub_elasticsearch_changelog.add_argument('--elasticsearch-index', + help="elasticsearch index to push into", + default="fatcat_changelog") + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") -- cgit v1.2.3