diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/workers/__init__.py | 1 | ||||
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 39 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 28 |
4 files changed, 66 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py index 18b35b2b..d3c0978f 100644 --- a/python/fatcat_tools/workers/__init__.py +++ b/python/fatcat_tools/workers/__init__.py @@ -2,6 +2,7 @@ from .changelog import ChangelogWorker, EntityUpdatesWorker from .elasticsearch import ( ElasticsearchChangelogWorker, ElasticsearchContainerWorker, + ElasticsearchFileWorker, ElasticsearchReleaseWorker, ) from .worker_common import FatcatWorker, most_recent_message diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index ff358c66..1af47d4b 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -418,7 +418,7 @@ class EntityUpdatesWorker(FatcatWorker): for ident in set(release_ids): release = self.api.get_release( - ident, expand="files,filesets,webcaptures,container" + ident, expand="files,filesets,webcaptures,container,creators" ) if release.work_id: work_ids.append(release.work_id) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 71c4dcf6..bfadea64 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -5,12 +5,19 @@ from typing import Any, Callable, List, Optional import elasticsearch import requests from confluent_kafka import Consumer, KafkaException -from fatcat_openapi_client import ApiClient, ChangelogEntry, ContainerEntity, ReleaseEntity +from fatcat_openapi_client import ( + ApiClient, + ChangelogEntry, + ContainerEntity, + FileEntity, + ReleaseEntity, +) from fatcat_tools import entity_from_json, public_api from fatcat_tools.transforms import ( changelog_to_elasticsearch, container_to_elasticsearch, + file_to_elasticsearch, release_to_elasticsearch, ) from fatcat_web.search import get_elastic_container_stats @@ -34,7 +41,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): offset: Optional[int] = None, elasticsearch_backend: str = "http://localhost:9200", elasticsearch_index: str = "fatcat", - elasticsearch_release_index: str = "fatcat_releases", + elasticsearch_release_index: str = "fatcat_release", batch_size: int = 200, api_host: str = "https://api.fatcat.wiki/v0", query_stats: bool = False, @@ -213,7 +220,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): query_stats: bool = False, elasticsearch_release_index: str = "fatcat_release", elasticsearch_backend: str = "http://localhost:9200", - elasticsearch_index: str = "fatcat", + elasticsearch_index: str = "fatcat_container", batch_size: int = 200, ): super().__init__( @@ -233,6 +240,32 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): self.transform_func = container_to_elasticsearch +class ElasticsearchFileWorker(ElasticsearchReleaseWorker): + def __init__( + self, + kafka_hosts: str, + consume_topic: str, + poll_interval: float = 10.0, + offset: Optional[int] = None, + elasticsearch_backend: str = "http://localhost:9200", + elasticsearch_index: str = "fatcat_file", + batch_size: int = 200, + ): + super().__init__( + kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + poll_interval=poll_interval, + offset=offset, + elasticsearch_backend=elasticsearch_backend, + elasticsearch_index=elasticsearch_index, + batch_size=batch_size, + ) + # previous group got corrupted (by pykafka library?) + self.consumer_group = "elasticsearch-updates3" + self.entity_type = FileEntity + self.transform_func = file_to_elasticsearch + + class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): """ Pulls changelog messages from Kafka, runs transformations and indexes them. diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index a49263ac..a7dcf755 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -10,6 +10,7 @@ from fatcat_tools.workers import ( ChangelogWorker, ElasticsearchChangelogWorker, ElasticsearchContainerWorker, + ElasticsearchFileWorker, ElasticsearchReleaseWorker, EntityUpdatesWorker, ) @@ -70,6 +71,17 @@ def run_elasticsearch_container(args: argparse.Namespace) -> None: worker.run() +def run_elasticsearch_file(args: argparse.Namespace) -> None: + consume_topic = "fatcat-{}.file-updates".format(args.env) + worker = ElasticsearchFileWorker( + args.kafka_hosts, + consume_topic, + elasticsearch_backend=args.elasticsearch_backend, + elasticsearch_index=args.elasticsearch_index, + ) + worker.run() + + def run_elasticsearch_changelog(args: argparse.Namespace) -> None: consume_topic = "fatcat-{}.changelog".format(args.env) worker = ElasticsearchChangelogWorker( @@ -150,6 +162,22 @@ def main() -> None: help="whether to query release search index for container stats", ) + sub_elasticsearch_file = subparsers.add_parser( + "elasticsearch-file", + help="consume kafka feed of new/updated files, transform and push to search", + ) + sub_elasticsearch_file.set_defaults(func=run_elasticsearch_file) + sub_elasticsearch_file.add_argument( + "--elasticsearch-backend", + help="elasticsearch backend to connect to", + default="http://localhost:9200", + ) + sub_elasticsearch_file.add_argument( + "--elasticsearch-index", + help="elasticsearch index to push into", + default="fatcat_file", + ) + sub_elasticsearch_changelog = subparsers.add_parser( "elasticsearch-changelog", help="consume changelog kafka feed, transform and push to search", |