diff options
author | bnewbold <bnewbold@archive.org> | 2020-04-17 18:13:14 +0000 |
---|---|---|
committer | bnewbold <bnewbold@archive.org> | 2020-04-17 18:13:14 +0000 |
commit | 963faf6cf6e7e5c6685ffe89e080134c7590957f (patch) | |
tree | bfac98e8f646f571ee34b8f6bb59e411fbac074d /python/fatcat_tools | |
parent | 68d5b259888b0cf22dce6894063f77d1ac2bccf4 (diff) | |
parent | 89db8df9eef40b92454ed9bd64830ebe5b726b9a (diff) | |
download | fatcat-963faf6cf6e7e5c6685ffe89e080134c7590957f.tar.gz fatcat-963faf6cf6e7e5c6685ffe89e080134c7590957f.zip |
Merge branch 'martin-changelog-to-es' into 'master'
derive changelog worker from release worker
See merge request webgroup/fatcat!43
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/workers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 23 |
2 files changed, 23 insertions, 2 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py index 8bea7cdc..32fd330d 100644 --- a/python/fatcat_tools/workers/__init__.py +++ b/python/fatcat_tools/workers/__init__.py @@ -1,4 +1,4 @@ from .changelog import ChangelogWorker, EntityUpdatesWorker -from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker +from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker from .worker_common import most_recent_message, FatcatWorker diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 68d6c304..525f372b 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -4,7 +4,7 @@ import time import requests from confluent_kafka import Consumer, KafkaException -from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient +from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry from fatcat_tools import * from .worker_common import FatcatWorker @@ -148,3 +148,24 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch + +class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): + """ + Pulls changelog messages from Kafka, runs transformations and indexes them. + + Note: Very early versions of changelog entries did not contain details + about the editor or extra fields. + """ + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog", + batch_size=200): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic) + self.consumer_group = "elasticsearch-updates3" + self.batch_size = batch_size + self.poll_interval = poll_interval + self.elasticsearch_backend = elasticsearch_backend + self.elasticsearch_index = elasticsearch_index + self.entity_type = ChangelogEntry + self.elasticsearch_document_name = "changelog" + self.transform_func = changelog_to_elasticsearch |