summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2020-04-17 14:30:57 +0200
committerMartin Czygan <martin.czygan@gmail.com>2020-04-17 14:43:31 +0200
commit89db8df9eef40b92454ed9bd64830ebe5b726b9a (patch)
tree7a3f920c4bc903aeb7298e80038574400f2c5579
parentabb92d6913f32697528f296baea524d81a505999 (diff)
downloadfatcat-89db8df9eef40b92454ed9bd64830ebe5b726b9a.tar.gz
fatcat-89db8df9eef40b92454ed9bd64830ebe5b726b9a.zip
derive changelog worker from release worker
Early versions of changelog entries may not have all the fields required for the current transform.
-rw-r--r--python/fatcat_tools/workers/__init__.py2
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py23
-rwxr-xr-xpython/fatcat_worker.py19
3 files changed, 41 insertions, 3 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py
index 8bea7cdc..32fd330d 100644
--- a/python/fatcat_tools/workers/__init__.py
+++ b/python/fatcat_tools/workers/__init__.py
@@ -1,4 +1,4 @@
from .changelog import ChangelogWorker, EntityUpdatesWorker
-from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker
+from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker
from .worker_common import most_recent_message, FatcatWorker
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 68d6c304..525f372b 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -4,7 +4,7 @@ import time
import requests
from confluent_kafka import Consumer, KafkaException
-from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient
+from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
from fatcat_tools import *
from .worker_common import FatcatWorker
@@ -148,3 +148,24 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
+
+class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
+ """
+ Pulls changelog messages from Kafka, runs transformations and indexes them.
+
+ Note: Very early versions of changelog entries did not contain details
+ about the editor or extra fields.
+ """
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog",
+ batch_size=200):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic)
+ self.consumer_group = "elasticsearch-updates3"
+ self.batch_size = batch_size
+ self.poll_interval = poll_interval
+ self.elasticsearch_backend = elasticsearch_backend
+ self.elasticsearch_index = elasticsearch_index
+ self.entity_type = ChangelogEntry
+ self.elasticsearch_document_name = "changelog"
+ self.transform_func = changelog_to_elasticsearch
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index bfb87a72..03167a3a 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -6,7 +6,7 @@ import datetime
import raven
from fatcat_tools import public_api
-from fatcat_tools.workers import ChangelogWorker, EntityUpdatesWorker, ElasticsearchReleaseWorker, ElasticsearchContainerWorker
+from fatcat_tools.workers import ChangelogWorker, EntityUpdatesWorker, ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker
# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
sentry_client = raven.Client()
@@ -47,6 +47,13 @@ def run_elasticsearch_container(args):
elasticsearch_index=args.elasticsearch_index)
worker.run()
+def run_elasticsearch_changelog(args):
+ consume_topic = "fatcat-{}.changelog".format(args.env)
+ worker = ElasticsearchChangelogWorker(args.kafka_hosts, consume_topic,
+ elasticsearch_backend=args.elasticsearch_backend,
+ elasticsearch_index=args.elasticsearch_index)
+ worker.run()
+
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -92,6 +99,16 @@ def main():
help="elasticsearch index to push into",
default="fatcat_container")
+ sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog',
+ help="consume changelog kafka feed, transform and push to search")
+ sub_elasticsearch_changelog.set_defaults(func=run_elasticsearch_changelog)
+ sub_elasticsearch_changelog.add_argument('--elasticsearch-backend',
+ help="elasticsearch backend to connect to",
+ default="http://localhost:9200")
+ sub_elasticsearch_changelog.add_argument('--elasticsearch-index',
+ help="elasticsearch index to push into",
+ default="fatcat_changelog")
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")