aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-03 14:16:05 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-03 16:46:07 -0700
commit45612465cc08114fa19588ea1b0aba0ace1e7b03 (patch)
treedf1381b7b99a0be79ddbabc3c0cfae05b4986a47 /python/fatcat_tools/workers/elasticsearch.py
parent924639f7faf8c2fc1a635c0f27a13d98b47fc453 (diff)
downloadfatcat-45612465cc08114fa19588ea1b0aba0ace1e7b03.tar.gz
fatcat-45612465cc08114fa19588ea1b0aba0ace1e7b03.zip
typing: add annotations to remaining fatcat_tools code
Again, these are just annotations, no changes made to get type checks to pass
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py63
1 files changed, 32 insertions, 31 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 989f7f5d..71c4dcf6 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,5 +1,6 @@
import json
import sys
+from typing import Any, Callable, List, Optional
import elasticsearch
import requests
@@ -27,17 +28,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat",
- elasticsearch_release_index="fatcat_releases",
- batch_size=200,
- api_host="https://api.fatcat.wiki/v0",
- query_stats=False,
- ):
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat",
+ elasticsearch_release_index: str = "fatcat_releases",
+ batch_size: int = 200,
+ api_host: str = "https://api.fatcat.wiki/v0",
+ query_stats: bool = False,
+ ) -> None:
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
@@ -46,18 +47,18 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
- self.transform_func = release_to_elasticsearch
+ self.transform_func: Callable = release_to_elasticsearch
self.api_host = api_host
self.query_stats = query_stats
- def run(self):
+ def run(self) -> None:
ac = ApiClient()
api = public_api(self.api_host)
# only used by container indexing query_stats code path
es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
- def fail_fast(err, partitions):
+ def fail_fast(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -73,7 +74,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
# print("Kafka consumer commit successful")
pass
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)
@@ -205,15 +206,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- query_stats=False,
- elasticsearch_release_index="fatcat_release",
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat",
- batch_size=200,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ query_stats: bool = False,
+ elasticsearch_release_index: str = "fatcat_release",
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat",
+ batch_size: int = 200,
):
super().__init__(
kafka_hosts=kafka_hosts,
@@ -242,13 +243,13 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat_changelog",
- batch_size=200,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat_changelog",
+ batch_size: int = 200,
):
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"