aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-12-15 11:58:23 -0800
committerBryan Newbold <bnewbold@robocracy.org>2021-12-15 11:58:23 -0800
commit73e6d75ec5c7f60315f0eed38c117e718eae0310 (patch)
tree5afa1839f43cf0ed4cd674b8efc7f0f62c314f75
parent07b9dfe28b4732f0ca52dc4fd571ea238d953279 (diff)
downloadfatcat-73e6d75ec5c7f60315f0eed38c117e718eae0310.tar.gz
fatcat-73e6d75ec5c7f60315f0eed38c117e718eae0310.zip
file elasticsearch index worker
-rw-r--r--python/fatcat_tools/workers/__init__.py1
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py35
-rwxr-xr-xpython/fatcat_worker.py28
3 files changed, 63 insertions, 1 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py
index 18b35b2b..d3c0978f 100644
--- a/python/fatcat_tools/workers/__init__.py
+++ b/python/fatcat_tools/workers/__init__.py
@@ -2,6 +2,7 @@ from .changelog import ChangelogWorker, EntityUpdatesWorker
from .elasticsearch import (
ElasticsearchChangelogWorker,
ElasticsearchContainerWorker,
+ ElasticsearchFileWorker,
ElasticsearchReleaseWorker,
)
from .worker_common import FatcatWorker, most_recent_message
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 71c4dcf6..21e1f41b 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -5,12 +5,19 @@ from typing import Any, Callable, List, Optional
import elasticsearch
import requests
from confluent_kafka import Consumer, KafkaException
-from fatcat_openapi_client import ApiClient, ChangelogEntry, ContainerEntity, ReleaseEntity
+from fatcat_openapi_client import (
+ ApiClient,
+ ChangelogEntry,
+ ContainerEntity,
+ FileEntity,
+ ReleaseEntity,
+)
from fatcat_tools import entity_from_json, public_api
from fatcat_tools.transforms import (
changelog_to_elasticsearch,
container_to_elasticsearch,
+ file_to_elasticsearch,
release_to_elasticsearch,
)
from fatcat_web.search import get_elastic_container_stats
@@ -233,6 +240,32 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
self.transform_func = container_to_elasticsearch
+class ElasticsearchFileWorker(ElasticsearchReleaseWorker):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat_file",
+ batch_size: int = 200,
+ ):
+ super().__init__(
+ kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ poll_interval=poll_interval,
+ offset=offset,
+ elasticsearch_backend=elasticsearch_backend,
+ elasticsearch_index=elasticsearch_index,
+ batch_size=batch_size,
+ )
+ # previous group got corrupted (by pykafka library?)
+ self.consumer_group = "elasticsearch-updates3"
+ self.entity_type = FileEntity
+ self.transform_func = file_to_elasticsearch
+
+
class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
"""
Pulls changelog messages from Kafka, runs transformations and indexes them.
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 0e2d03b5..67ea8451 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -10,6 +10,7 @@ from fatcat_tools.workers import (
ChangelogWorker,
ElasticsearchChangelogWorker,
ElasticsearchContainerWorker,
+ ElasticsearchFileWorker,
ElasticsearchReleaseWorker,
EntityUpdatesWorker,
)
@@ -70,6 +71,17 @@ def run_elasticsearch_container(args: argparse.Namespace) -> None:
worker.run()
+def run_elasticsearch_file(args: argparse.Namespace) -> None:
+ consume_topic = "fatcat-{}.file-updates".format(args.env)
+ worker = ElasticsearchFileWorker(
+ args.kafka_hosts,
+ consume_topic,
+ elasticsearch_backend=args.elasticsearch_backend,
+ elasticsearch_index=args.elasticsearch_index,
+ )
+ worker.run()
+
+
def run_elasticsearch_changelog(args: argparse.Namespace) -> None:
consume_topic = "fatcat-{}.changelog".format(args.env)
worker = ElasticsearchChangelogWorker(
@@ -150,6 +162,22 @@ def main() -> None:
help="whether to query release search index for container stats",
)
+ sub_elasticsearch_file = subparsers.add_parser(
+ "elasticsearch-file",
+ help="consume kafka feed of new/updated files, transform and push to search",
+ )
+ sub_elasticsearch_file.set_defaults(func=run_elasticsearch_file)
+ sub_elasticsearch_file.add_argument(
+ "--elasticsearch-backend",
+ help="elasticsearch backend to connect to",
+ default="http://localhost:9200",
+ )
+ sub_elasticsearch_file.add_argument(
+ "--elasticsearch-index",
+ help="elasticsearch index to push into",
+ default="fatcat_file",
+ )
+
sub_elasticsearch_changelog = subparsers.add_parser(
"elasticsearch-changelog",
help="consume changelog kafka feed, transform and push to search",