From 73e6d75ec5c7f60315f0eed38c117e718eae0310 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 15 Dec 2021 11:58:23 -0800 Subject: file elasticsearch index worker --- python/fatcat_tools/workers/elasticsearch.py | 35 +++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) (limited to 'python/fatcat_tools/workers/elasticsearch.py') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 71c4dcf6..21e1f41b 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -5,12 +5,19 @@ from typing import Any, Callable, List, Optional import elasticsearch import requests from confluent_kafka import Consumer, KafkaException -from fatcat_openapi_client import ApiClient, ChangelogEntry, ContainerEntity, ReleaseEntity +from fatcat_openapi_client import ( + ApiClient, + ChangelogEntry, + ContainerEntity, + FileEntity, + ReleaseEntity, +) from fatcat_tools import entity_from_json, public_api from fatcat_tools.transforms import ( changelog_to_elasticsearch, container_to_elasticsearch, + file_to_elasticsearch, release_to_elasticsearch, ) from fatcat_web.search import get_elastic_container_stats @@ -233,6 +240,32 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): self.transform_func = container_to_elasticsearch +class ElasticsearchFileWorker(ElasticsearchReleaseWorker): + def __init__( + self, + kafka_hosts: str, + consume_topic: str, + poll_interval: float = 10.0, + offset: Optional[int] = None, + elasticsearch_backend: str = "http://localhost:9200", + elasticsearch_index: str = "fatcat_file", + batch_size: int = 200, + ): + super().__init__( + kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + poll_interval=poll_interval, + offset=offset, + elasticsearch_backend=elasticsearch_backend, + elasticsearch_index=elasticsearch_index, + batch_size=batch_size, + ) + # previous group got corrupted (by pykafka library?) + self.consumer_group = "elasticsearch-updates3" + self.entity_type = FileEntity + self.transform_func = file_to_elasticsearch + + class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): """ Pulls changelog messages from Kafka, runs transformations and indexes them. -- cgit v1.2.3 From 998c0cd2e2096f56954ad5e84f4b5b623bf98211 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 15 Dec 2021 11:58:43 -0800 Subject: small default config typo fixes for elasticsearch workers --- python/fatcat_tools/workers/elasticsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'python/fatcat_tools/workers/elasticsearch.py') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 21e1f41b..bfadea64 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -41,7 +41,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): offset: Optional[int] = None, elasticsearch_backend: str = "http://localhost:9200", elasticsearch_index: str = "fatcat", - elasticsearch_release_index: str = "fatcat_releases", + elasticsearch_release_index: str = "fatcat_release", batch_size: int = 200, api_host: str = "https://api.fatcat.wiki/v0", query_stats: bool = False, @@ -220,7 +220,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): query_stats: bool = False, elasticsearch_release_index: str = "fatcat_release", elasticsearch_backend: str = "http://localhost:9200", - elasticsearch_index: str = "fatcat", + elasticsearch_index: str = "fatcat_container", batch_size: int = 200, ): super().__init__( -- cgit v1.2.3