diff options
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 43 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 63 | ||||
-rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 15 |
3 files changed, 70 insertions, 51 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 1e4cb41d..8f8efdda 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,9 @@ import json import time +from typing import Any, Dict, List, Optional from confluent_kafka import Consumer, KafkaException, Producer +from fatcat_openapi_client import ApiClient, ReleaseEntity from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch @@ -14,12 +16,19 @@ class ChangelogWorker(FatcatWorker): found, fetch them and push (as JSON) into a Kafka topic. """ - def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): + def __init__( + self, + api: ApiClient, + kafka_hosts: str, + produce_topic: str, + poll_interval: float = 10.0, + offset: Optional[int] = None, + ) -> None: super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api) self.poll_interval = poll_interval self.offset = offset # the fatcat changelog offset, not the kafka offset - def run(self): + def run(self) -> None: # On start, try to consume the most recent from the topic, and using # that as the starting offset. Note that this is a single-partition @@ -33,7 +42,7 @@ class ChangelogWorker(FatcatWorker): self.offset = 0 print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) - def fail_fast(err, msg): + def fail_fast(err: Any, _msg: Any) -> None: if err is not None: print("Kafka producer delivery error: {}".format(err)) print("Bailing out...") @@ -79,15 +88,15 @@ class EntityUpdatesWorker(FatcatWorker): def __init__( self, - api, - kafka_hosts, - consume_topic, - release_topic, - file_topic, - container_topic, - ingest_file_request_topic, - work_ident_topic, - poll_interval=5.0, + api: ApiClient, + kafka_hosts: str, + consume_topic: str, + release_topic: str, + file_topic: str, + container_topic: str, + ingest_file_request_topic: str, + work_ident_topic: str, + poll_interval: float = 5.0, ): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic @@ -158,7 +167,7 @@ class EntityUpdatesWorker(FatcatWorker): "10.17504/", ] - def want_live_ingest(self, release, ingest_request): + def want_live_ingest(self, release: ReleaseEntity, ingest_request: Dict[str, Any]) -> bool: """ This function looks at ingest requests and decides whether they are worth enqueing for ingest. @@ -259,15 +268,15 @@ class EntityUpdatesWorker(FatcatWorker): return True - def run(self): - def fail_fast(err, msg): + def run(self) -> None: + def fail_fast(err: Any, _msg: Any) -> None: if err is not None: print("Kafka producer delivery error: {}".format(err)) print("Bailing out...") # TODO: should it be sys.exit(-1)? raise KafkaException(err) - def on_commit(err, partitions): + def on_commit(err: Any, partitions: List[Any]) -> None: if err is not None: print("Kafka consumer commit error: {}".format(err)) print("Bailing out...") @@ -284,7 +293,7 @@ class EntityUpdatesWorker(FatcatWorker): print("Kafka consumer commit successful") pass - def on_rebalance(consumer, partitions): + def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None: for p in partitions: if p.error: raise KafkaException(p.error) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 989f7f5d..71c4dcf6 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,5 +1,6 @@ import json import sys +from typing import Any, Callable, List, Optional import elasticsearch import requests @@ -27,17 +28,17 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__( self, - kafka_hosts, - consume_topic, - poll_interval=10.0, - offset=None, - elasticsearch_backend="http://localhost:9200", - elasticsearch_index="fatcat", - elasticsearch_release_index="fatcat_releases", - batch_size=200, - api_host="https://api.fatcat.wiki/v0", - query_stats=False, - ): + kafka_hosts: str, + consume_topic: str, + poll_interval: float = 10.0, + offset: Optional[int] = None, + elasticsearch_backend: str = "http://localhost:9200", + elasticsearch_index: str = "fatcat", + elasticsearch_release_index: str = "fatcat_releases", + batch_size: int = 200, + api_host: str = "https://api.fatcat.wiki/v0", + query_stats: bool = False, + ) -> None: super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size @@ -46,18 +47,18 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.elasticsearch_index = elasticsearch_index self.elasticsearch_release_index = elasticsearch_release_index self.entity_type = ReleaseEntity - self.transform_func = release_to_elasticsearch + self.transform_func: Callable = release_to_elasticsearch self.api_host = api_host self.query_stats = query_stats - def run(self): + def run(self) -> None: ac = ApiClient() api = public_api(self.api_host) # only used by container indexing query_stats code path es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) - def fail_fast(err, partitions): + def fail_fast(err: Any, partitions: List[Any]) -> None: if err is not None: print("Kafka consumer commit error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) @@ -73,7 +74,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): # print("Kafka consumer commit successful") pass - def on_rebalance(consumer, partitions): + def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None: for p in partitions: if p.error: raise KafkaException(p.error) @@ -205,15 +206,15 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__( self, - kafka_hosts, - consume_topic, - poll_interval=10.0, - offset=None, - query_stats=False, - elasticsearch_release_index="fatcat_release", - elasticsearch_backend="http://localhost:9200", - elasticsearch_index="fatcat", - batch_size=200, + kafka_hosts: str, + consume_topic: str, + poll_interval: float = 10.0, + offset: Optional[int] = None, + query_stats: bool = False, + elasticsearch_release_index: str = "fatcat_release", + elasticsearch_backend: str = "http://localhost:9200", + elasticsearch_index: str = "fatcat", + batch_size: int = 200, ): super().__init__( kafka_hosts=kafka_hosts, @@ -242,13 +243,13 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): def __init__( self, - kafka_hosts, - consume_topic, - poll_interval=10.0, - offset=None, - elasticsearch_backend="http://localhost:9200", - elasticsearch_index="fatcat_changelog", - batch_size=200, + kafka_hosts: str, + consume_topic: str, + poll_interval: float = 10.0, + offset: Optional[int] = None, + elasticsearch_backend: str = "http://localhost:9200", + elasticsearch_index: str = "fatcat_changelog", + batch_size: int = 200, ): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index baec44f4..5239465b 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -1,7 +1,10 @@ -from confluent_kafka import Consumer, KafkaException, TopicPartition +from typing import Any, Dict, Optional +from confluent_kafka import Consumer, KafkaException, Message, TopicPartition +from fatcat_openapi_client import ApiClient -def most_recent_message(topic, kafka_config): + +def most_recent_message(topic: str, kafka_config: Dict[str, Any]) -> Message: """ Tries to fetch the most recent message from a given topic. @@ -50,7 +53,13 @@ class FatcatWorker: Common code for for Kafka producers and consumers. """ - def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None): + def __init__( + self, + kafka_hosts: str, + produce_topic: Optional[str] = None, + consume_topic: Optional[str] = None, + api: Optional[ApiClient] = None, + ) -> None: if api: self.api = api self.kafka_config = { |