aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/changelog.py43
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py63
-rw-r--r--python/fatcat_tools/workers/worker_common.py15
3 files changed, 70 insertions, 51 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 1e4cb41d..8f8efdda 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -1,7 +1,9 @@
import json
import time
+from typing import Any, Dict, List, Optional
from confluent_kafka import Consumer, KafkaException, Producer
+from fatcat_openapi_client import ApiClient, ReleaseEntity
from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch
@@ -14,12 +16,19 @@ class ChangelogWorker(FatcatWorker):
found, fetch them and push (as JSON) into a Kafka topic.
"""
- def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
+ def __init__(
+ self,
+ api: ApiClient,
+ kafka_hosts: str,
+ produce_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ ) -> None:
super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)
self.poll_interval = poll_interval
self.offset = offset # the fatcat changelog offset, not the kafka offset
- def run(self):
+ def run(self) -> None:
# On start, try to consume the most recent from the topic, and using
# that as the starting offset. Note that this is a single-partition
@@ -33,7 +42,7 @@ class ChangelogWorker(FatcatWorker):
self.offset = 0
print("Most recent changelog index in Kafka seems to be {}".format(self.offset))
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
@@ -79,15 +88,15 @@ class EntityUpdatesWorker(FatcatWorker):
def __init__(
self,
- api,
- kafka_hosts,
- consume_topic,
- release_topic,
- file_topic,
- container_topic,
- ingest_file_request_topic,
- work_ident_topic,
- poll_interval=5.0,
+ api: ApiClient,
+ kafka_hosts: str,
+ consume_topic: str,
+ release_topic: str,
+ file_topic: str,
+ container_topic: str,
+ ingest_file_request_topic: str,
+ work_ident_topic: str,
+ poll_interval: float = 5.0,
):
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)
self.release_topic = release_topic
@@ -158,7 +167,7 @@ class EntityUpdatesWorker(FatcatWorker):
"10.17504/",
]
- def want_live_ingest(self, release, ingest_request):
+ def want_live_ingest(self, release: ReleaseEntity, ingest_request: Dict[str, Any]) -> bool:
"""
This function looks at ingest requests and decides whether they are
worth enqueing for ingest.
@@ -259,15 +268,15 @@ class EntityUpdatesWorker(FatcatWorker):
return True
- def run(self):
- def fail_fast(err, msg):
+ def run(self) -> None:
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- def on_commit(err, partitions):
+ def on_commit(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err))
print("Bailing out...")
@@ -284,7 +293,7 @@ class EntityUpdatesWorker(FatcatWorker):
print("Kafka consumer commit successful")
pass
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 989f7f5d..71c4dcf6 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,5 +1,6 @@
import json
import sys
+from typing import Any, Callable, List, Optional
import elasticsearch
import requests
@@ -27,17 +28,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat",
- elasticsearch_release_index="fatcat_releases",
- batch_size=200,
- api_host="https://api.fatcat.wiki/v0",
- query_stats=False,
- ):
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat",
+ elasticsearch_release_index: str = "fatcat_releases",
+ batch_size: int = 200,
+ api_host: str = "https://api.fatcat.wiki/v0",
+ query_stats: bool = False,
+ ) -> None:
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
@@ -46,18 +47,18 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
- self.transform_func = release_to_elasticsearch
+ self.transform_func: Callable = release_to_elasticsearch
self.api_host = api_host
self.query_stats = query_stats
- def run(self):
+ def run(self) -> None:
ac = ApiClient()
api = public_api(self.api_host)
# only used by container indexing query_stats code path
es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
- def fail_fast(err, partitions):
+ def fail_fast(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -73,7 +74,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
# print("Kafka consumer commit successful")
pass
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)
@@ -205,15 +206,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- query_stats=False,
- elasticsearch_release_index="fatcat_release",
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat",
- batch_size=200,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ query_stats: bool = False,
+ elasticsearch_release_index: str = "fatcat_release",
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat",
+ batch_size: int = 200,
):
super().__init__(
kafka_hosts=kafka_hosts,
@@ -242,13 +243,13 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat_changelog",
- batch_size=200,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat_changelog",
+ batch_size: int = 200,
):
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index baec44f4..5239465b 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -1,7 +1,10 @@
-from confluent_kafka import Consumer, KafkaException, TopicPartition
+from typing import Any, Dict, Optional
+from confluent_kafka import Consumer, KafkaException, Message, TopicPartition
+from fatcat_openapi_client import ApiClient
-def most_recent_message(topic, kafka_config):
+
+def most_recent_message(topic: str, kafka_config: Dict[str, Any]) -> Message:
"""
Tries to fetch the most recent message from a given topic.
@@ -50,7 +53,13 @@ class FatcatWorker:
Common code for for Kafka producers and consumers.
"""
- def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ produce_topic: Optional[str] = None,
+ consume_topic: Optional[str] = None,
+ api: Optional[ApiClient] = None,
+ ) -> None:
if api:
self.api = api
self.kafka_config = {