aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/worker_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/worker_common.py')
-rw-r--r--python/fatcat_tools/workers/worker_common.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index baec44f4..5239465b 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -1,7 +1,10 @@
-from confluent_kafka import Consumer, KafkaException, TopicPartition
+from typing import Any, Dict, Optional
+from confluent_kafka import Consumer, KafkaException, Message, TopicPartition
+from fatcat_openapi_client import ApiClient
-def most_recent_message(topic, kafka_config):
+
+def most_recent_message(topic: str, kafka_config: Dict[str, Any]) -> Message:
"""
Tries to fetch the most recent message from a given topic.
@@ -50,7 +53,13 @@ class FatcatWorker:
Common code for for Kafka producers and consumers.
"""
- def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ produce_topic: Optional[str] = None,
+ consume_topic: Optional[str] = None,
+ api: Optional[ApiClient] = None,
+ ) -> None:
if api:
self.api = api
self.kafka_config = {