summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/__init__.py2
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py23
2 files changed, 23 insertions, 2 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py
index 8bea7cdc..32fd330d 100644
--- a/python/fatcat_tools/workers/__init__.py
+++ b/python/fatcat_tools/workers/__init__.py
@@ -1,4 +1,4 @@
from .changelog import ChangelogWorker, EntityUpdatesWorker
-from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker
+from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker
from .worker_common import most_recent_message, FatcatWorker
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 68d6c304..525f372b 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -4,7 +4,7 @@ import time
import requests
from confluent_kafka import Consumer, KafkaException
-from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient
+from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
from fatcat_tools import *
from .worker_common import FatcatWorker
@@ -148,3 +148,24 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
+
+class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
+ """
+ Pulls changelog messages from Kafka, runs transformations and indexes them.
+
+ Note: Very early versions of changelog entries did not contain details
+ about the editor or extra fields.
+ """
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog",
+ batch_size=200):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic)
+ self.consumer_group = "elasticsearch-updates3"
+ self.batch_size = batch_size
+ self.poll_interval = poll_interval
+ self.elasticsearch_backend = elasticsearch_backend
+ self.elasticsearch_index = elasticsearch_index
+ self.entity_type = ChangelogEntry
+ self.elasticsearch_document_name = "changelog"
+ self.transform_func = changelog_to_elasticsearch