aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r--python/fatcat_tools/workers/changelog.py43
1 files changed, 26 insertions, 17 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 1e4cb41d..8f8efdda 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -1,7 +1,9 @@
import json
import time
+from typing import Any, Dict, List, Optional
from confluent_kafka import Consumer, KafkaException, Producer
+from fatcat_openapi_client import ApiClient, ReleaseEntity
from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch
@@ -14,12 +16,19 @@ class ChangelogWorker(FatcatWorker):
found, fetch them and push (as JSON) into a Kafka topic.
"""
- def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
+ def __init__(
+ self,
+ api: ApiClient,
+ kafka_hosts: str,
+ produce_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ ) -> None:
super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)
self.poll_interval = poll_interval
self.offset = offset # the fatcat changelog offset, not the kafka offset
- def run(self):
+ def run(self) -> None:
# On start, try to consume the most recent from the topic, and using
# that as the starting offset. Note that this is a single-partition
@@ -33,7 +42,7 @@ class ChangelogWorker(FatcatWorker):
self.offset = 0
print("Most recent changelog index in Kafka seems to be {}".format(self.offset))
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
@@ -79,15 +88,15 @@ class EntityUpdatesWorker(FatcatWorker):
def __init__(
self,
- api,
- kafka_hosts,
- consume_topic,
- release_topic,
- file_topic,
- container_topic,
- ingest_file_request_topic,
- work_ident_topic,
- poll_interval=5.0,
+ api: ApiClient,
+ kafka_hosts: str,
+ consume_topic: str,
+ release_topic: str,
+ file_topic: str,
+ container_topic: str,
+ ingest_file_request_topic: str,
+ work_ident_topic: str,
+ poll_interval: float = 5.0,
):
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)
self.release_topic = release_topic
@@ -158,7 +167,7 @@ class EntityUpdatesWorker(FatcatWorker):
"10.17504/",
]
- def want_live_ingest(self, release, ingest_request):
+ def want_live_ingest(self, release: ReleaseEntity, ingest_request: Dict[str, Any]) -> bool:
"""
This function looks at ingest requests and decides whether they are
worth enqueing for ingest.
@@ -259,15 +268,15 @@ class EntityUpdatesWorker(FatcatWorker):
return True
- def run(self):
- def fail_fast(err, msg):
+ def run(self) -> None:
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- def on_commit(err, partitions):
+ def on_commit(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err))
print("Bailing out...")
@@ -284,7 +293,7 @@ class EntityUpdatesWorker(FatcatWorker):
print("Kafka consumer commit successful")
pass
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)