aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py23
1 files changed, 22 insertions, 1 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 68d6c304..525f372b 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -4,7 +4,7 @@ import time
import requests
from confluent_kafka import Consumer, KafkaException
-from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient
+from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
from fatcat_tools import *
from .worker_common import FatcatWorker
@@ -148,3 +148,24 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
+
+class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
+ """
+ Pulls changelog messages from Kafka, runs transformations and indexes them.
+
+ Note: Very early versions of changelog entries did not contain details
+ about the editor or extra fields.
+ """
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog",
+ batch_size=200):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic)
+ self.consumer_group = "elasticsearch-updates3"
+ self.batch_size = batch_size
+ self.poll_interval = poll_interval
+ self.elasticsearch_backend = elasticsearch_backend
+ self.elasticsearch_index = elasticsearch_index
+ self.entity_type = ChangelogEntry
+ self.elasticsearch_document_name = "changelog"
+ self.transform_func = changelog_to_elasticsearch