aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/fatcat_tools/cleanups/common.py19
-rw-r--r--python/fatcat_tools/cleanups/files.py8
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py60
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py45
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py17
-rw-r--r--python/fatcat_tools/harvest/pubmed.py38
-rw-r--r--python/fatcat_tools/workers/changelog.py43
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py63
-rw-r--r--python/fatcat_tools/workers/worker_common.py15
9 files changed, 186 insertions, 122 deletions
diff --git a/python/fatcat_tools/cleanups/common.py b/python/fatcat_tools/cleanups/common.py
index 26ca7bd6..7ebfc8a0 100644
--- a/python/fatcat_tools/cleanups/common.py
+++ b/python/fatcat_tools/cleanups/common.py
@@ -2,6 +2,7 @@ import copy
import json
import subprocess
from collections import Counter
+from typing import Any, Dict, List
from fatcat_openapi_client import ApiClient, Editgroup
@@ -27,7 +28,7 @@ class EntityCleaner:
This class is pretty similar to EntityImporter, but isn't subclassed.
"""
- def __init__(self, api, entity_type, **kwargs):
+ def __init__(self, api: ApiClient, entity_type: Any, **kwargs) -> None:
eg_extra = kwargs.get("editgroup_extra", dict())
eg_extra["git_rev"] = eg_extra.get(
@@ -49,14 +50,14 @@ class EntityCleaner:
if self.dry_run_mode:
print("Running in dry-run mode!")
- def reset(self):
+ def reset(self) -> None:
self.counts = Counter({"lines": 0, "cleaned": 0, "updated": 0})
self._edit_count = 0
self._editgroup_id = None
- self._entity_queue = []
- self._idents_inflight = []
+ self._entity_queue: List[Any] = []
+ self._idents_inflight: List[str] = []
- def push_record(self, record):
+ def push_record(self, record: Dict[str, Any]) -> None:
"""
Intended to be called by "pusher" class (which could be pulling from
JSON file, Kafka, whatever).
@@ -106,14 +107,14 @@ class EntityCleaner:
self._idents_inflight = []
return
- def clean_entity(self, entity):
+ def clean_entity(self, entity: Any) -> Any:
"""
Mutates entity in-place and returns it
"""
# implementations should fill this in
raise NotImplementedError
- def try_update(self, entity):
+ def try_update(self, entity: Any) -> int:
"""
Returns edit count (number of entities updated).
@@ -123,7 +124,7 @@ class EntityCleaner:
# implementations should fill this in
raise NotImplementedError
- def finish(self):
+ def finish(self) -> Counter:
if self._edit_count > 0:
self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
@@ -132,7 +133,7 @@ class EntityCleaner:
return self.counts
- def get_editgroup_id(self):
+ def get_editgroup_id(self) -> str:
if not self._editgroup_id:
eg = self.api.create_editgroup(
diff --git a/python/fatcat_tools/cleanups/files.py b/python/fatcat_tools/cleanups/files.py
index d378a91f..309924e3 100644
--- a/python/fatcat_tools/cleanups/files.py
+++ b/python/fatcat_tools/cleanups/files.py
@@ -1,4 +1,4 @@
-from fatcat_openapi_client.models import FileEntity
+from fatcat_openapi_client import ApiClient, FileEntity
from fatcat_openapi_client.rest import ApiException
from .common import EntityCleaner
@@ -9,7 +9,7 @@ class FileCleaner(EntityCleaner):
File fixups!
"""
- def __init__(self, api, **kwargs):
+ def __init__(self, api: ApiClient, **kwargs) -> None:
eg_desc = (
kwargs.pop("editgroup_description", None)
@@ -25,7 +25,7 @@ class FileCleaner(EntityCleaner):
**kwargs
)
- def clean_entity(self, entity):
+ def clean_entity(self, entity: FileEntity) -> FileEntity:
"""
TODO: mimetype is bogus like (???) => clean mimetype
"""
@@ -54,7 +54,7 @@ class FileCleaner(EntityCleaner):
return entity
- def try_update(self, entity):
+ def try_update(self, entity: FileEntity) -> int:
try:
existing = self.api.get_file(entity.ident)
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index dd48e256..23b300ba 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -1,6 +1,8 @@
+import datetime
import json
import sys
import time
+from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, urlparse
from confluent_kafka import KafkaException, Producer
@@ -60,14 +62,14 @@ class HarvestCrossrefWorker:
def __init__(
self,
- kafka_hosts,
- produce_topic,
- state_topic,
- contact_email,
- api_host_url="https://api.crossref.org/works",
- start_date=None,
- end_date=None,
- ):
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ contact_email: str,
+ api_host_url: str = "https://api.crossref.org/works",
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ ) -> None:
self.api_host_url = api_host_url
self.produce_topic = produce_topic
@@ -86,8 +88,8 @@ class HarvestCrossrefWorker:
self.name = "Crossref"
self.producer = self._kafka_producer()
- def _kafka_producer(self):
- def fail_fast(err, msg):
+ def _kafka_producer(self) -> Producer:
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -107,7 +109,7 @@ class HarvestCrossrefWorker:
)
return Producer(producer_conf)
- def params(self, date_str):
+ def params(self, date_str: str) -> Dict[str, Any]:
filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str)
return {
"filter": filter_param,
@@ -115,14 +117,14 @@ class HarvestCrossrefWorker:
"cursor": "*",
}
- def update_params(self, params, resp):
+ def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]:
params["cursor"] = resp["message"]["next-cursor"]
return params
- def extract_key(self, obj):
+ def extract_key(self, obj: Dict[str, Any]) -> bytes:
return obj["DOI"].encode("utf-8")
- def fetch_date(self, date):
+ def fetch_date(self, date: datetime.date) -> None:
date_str = date.isoformat()
params = self.params(date_str)
@@ -182,13 +184,13 @@ class HarvestCrossrefWorker:
params = self.update_params(params, resp)
self.producer.flush()
- def extract_items(self, resp):
+ def extract_items(self, resp: Dict[str, Any]) -> List[Dict]:
return resp["message"]["items"]
- def extract_total(self, resp):
+ def extract_total(self, resp: Dict[str, Any]) -> int:
return resp["message"]["total-results"]
- def run(self, continuous=False):
+ def run(self, continuous: bool = False) -> None:
while True:
current = self.state.next_span(continuous)
@@ -222,13 +224,13 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
def __init__(
self,
- kafka_hosts,
- produce_topic,
- state_topic,
- contact_email,
- api_host_url="https://api.datacite.org/dois",
- start_date=None,
- end_date=None,
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ contact_email: str,
+ api_host_url: str = "https://api.datacite.org/dois",
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
):
super().__init__(
kafka_hosts=kafka_hosts,
@@ -243,7 +245,7 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
# for datecite, it's "from-update-date"
self.name = "Datacite"
- def params(self, date_str):
+ def params(self, date_str: str) -> Dict[str, Any]:
"""
Dates have to be supplied in 2018-10-27T22:36:30.000Z format.
"""
@@ -255,16 +257,16 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
"page[cursor]": 1,
}
- def extract_items(self, resp):
+ def extract_items(self, resp: Dict[str, Any]) -> List[Dict]:
return resp["data"]
- def extract_total(self, resp):
+ def extract_total(self, resp: Dict[str, Any]) -> int:
return resp["meta"]["total"]
- def extract_key(self, obj):
+ def extract_key(self, obj: Dict[str, Any]) -> bytes:
return obj["attributes"]["doi"].encode("utf-8")
- def update_params(self, params, resp):
+ def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]:
"""
Using cursor mechanism (https://support.datacite.org/docs/pagination#section-cursor).
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index fda0dc62..4004600b 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -1,6 +1,7 @@
import datetime
import json
import sys
+from typing import Any, Dict, Optional, Sequence, Set
import requests
from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition
@@ -15,8 +16,11 @@ DATE_FMT = "%Y-%m-%d"
def requests_retry_session(
- retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None
-):
+ retries: int = 10,
+ backoff_factor: int = 3,
+ status_forcelist: Sequence[int] = (500, 502, 504),
+ session: requests.Session = None,
+) -> requests.Session:
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -51,19 +55,29 @@ class HarvestState:
NOTE: should this class manage the state topic as well? Hrm.
"""
- def __init__(self, start_date=None, end_date=None, catchup_days=14):
- self.to_process = set()
- self.completed = set()
+ def __init__(
+ self,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ catchup_days: int = 14,
+ ):
+ self.to_process: Set[datetime.date] = set()
+ self.completed: Set[datetime.date] = set()
if catchup_days or start_date or end_date:
self.enqueue_period(start_date, end_date, catchup_days)
- def __str__(self):
+ def __str__(self) -> str:
return "<HarvestState to_process={}, completed={}>".format(
len(self.to_process), len(self.completed)
)
- def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
+ def enqueue_period(
+ self,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ catchup_days: int = 14,
+ ) -> None:
"""
This function adds a time period to the "TODO" list, unless the dates
have already been processed.
@@ -85,7 +99,7 @@ class HarvestState:
self.to_process.add(current)
current += datetime.timedelta(days=1)
- def next_span(self, continuous=False):
+ def next_span(self, continuous: bool = False) -> Optional[datetime.date]:
"""
Gets next timespan (date) to be processed, or returns None if completed.
@@ -102,7 +116,7 @@ class HarvestState:
return None
return sorted(list(self.to_process))[0]
- def update(self, state_json):
+ def update(self, state_json: str) -> None:
"""
Merges a state JSON object into the current state.
@@ -114,7 +128,12 @@ class HarvestState:
date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()
self.complete(date)
- def complete(self, date, kafka_topic=None, kafka_config=None):
+ def complete(
+ self,
+ date: datetime.date,
+ kafka_topic: Optional[str] = None,
+ kafka_config: Optional[Dict] = None,
+ ) -> bytes:
"""
Records that a date has been processed successfully.
@@ -137,7 +156,7 @@ class HarvestState:
if kafka_topic:
assert kafka_config
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err:
raise KafkaException(err)
@@ -156,7 +175,7 @@ class HarvestState:
producer.flush()
return state_json
- def initialize_from_kafka(self, kafka_topic, kafka_config):
+ def initialize_from_kafka(self, kafka_topic: str, kafka_config: Dict[str, Any]) -> None:
"""
kafka_topic should have type str
@@ -167,7 +186,7 @@ class HarvestState:
print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr)
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err:
raise KafkaException(err)
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 40d1c853..19eb6897 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -1,5 +1,7 @@
+import datetime
import sys
import time
+from typing import Any, Optional
import sickle
from confluent_kafka import KafkaException, Producer
@@ -24,7 +26,14 @@ class HarvestOaiPmhWorker:
would want something similar operationally. Oh well!
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ ):
self.produce_topic = produce_topic
self.state_topic = state_topic
@@ -42,8 +51,8 @@ class HarvestOaiPmhWorker:
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
print(self.state, file=sys.stderr)
- def fetch_date(self, date):
- def fail_fast(err, msg):
+ def fetch_date(self, date: datetime.date) -> None:
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -93,7 +102,7 @@ class HarvestOaiPmhWorker:
)
producer.flush()
- def run(self, continuous=False):
+ def run(self, continuous: bool = False) -> None:
while True:
current = self.state.next_span(continuous)
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py
index 0f33f334..a1b4da0e 100644
--- a/python/fatcat_tools/harvest/pubmed.py
+++ b/python/fatcat_tools/harvest/pubmed.py
@@ -9,6 +9,7 @@ Assumptions:
"""
import collections
+import datetime
import ftplib
import gzip
import io
@@ -22,6 +23,7 @@ import tempfile
import time
import xml.etree.ElementTree as ET
import zlib
+from typing import Any, Dict, Generator, Optional
from urllib.parse import urlparse
import dateparser
@@ -61,7 +63,14 @@ class PubmedFTPWorker:
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ ):
self.name = "Pubmed"
self.host = "ftp.ncbi.nlm.nih.gov"
self.produce_topic = produce_topic
@@ -74,10 +83,10 @@ class PubmedFTPWorker:
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
self.producer = self._kafka_producer()
- self.date_file_map = None
+ self.date_file_map: Optional[Dict[str, Any]] = None
- def _kafka_producer(self):
- def fail_fast(err, msg):
+ def _kafka_producer(self) -> Producer:
+ def fail_fast(err: Any, _msg: None) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -97,7 +106,7 @@ class PubmedFTPWorker:
)
return Producer(producer_conf)
- def fetch_date(self, date):
+ def fetch_date(self, date: datetime.date) -> bool:
"""
Fetch file or files for a given date and feed Kafka one article per
message. If the fetched XML does not contain a PMID an exception is
@@ -163,7 +172,7 @@ class PubmedFTPWorker:
return True
- def run(self, continuous=False):
+ def run(self, continuous: bool = False) -> None:
while True:
self.date_file_map = generate_date_file_map(host=self.host)
if len(self.date_file_map) == 0:
@@ -188,7 +197,7 @@ class PubmedFTPWorker:
print("{} FTP ingest caught up".format(self.name))
-def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):
+def generate_date_file_map(host: str = "ftp.ncbi.nlm.nih.gov") -> Dict[str, Any]:
"""
Generate a DefaultDict[string, set] mapping dates to absolute filepaths on
the server (mostly we have one file, but sometimes more).
@@ -259,7 +268,9 @@ def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):
return mapping
-def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
+def ftpretr(
+ url: str, max_retries: int = 10, retry_delay: int = 1, proxy_hostport: Optional[str] = None
+) -> str:
"""
Note: This might move into a generic place in the future.
@@ -305,8 +316,11 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
def ftpretr_via_http_proxy(
- url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1
-):
+ url: str,
+ proxy_hostport: str = "ftp.ncbi.nlm.nih.gov",
+ max_retries: int = 10,
+ retry_delay: int = 1,
+) -> str:
"""
Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would
be retrievable via proxy.com/a/b/c; (in 09/2021 we used
@@ -335,7 +349,7 @@ def ftpretr_via_http_proxy(
time.sleep(retry_delay)
-def xmlstream(filename, tag, encoding="utf-8"):
+def xmlstream(filename: str, tag: str, encoding: str = "utf-8") -> Generator[Any, Any, Any]:
"""
Note: This might move into a generic place in the future.
@@ -348,7 +362,7 @@ def xmlstream(filename, tag, encoding="utf-8"):
Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities
"""
- def strip_ns(tag):
+ def strip_ns(tag: str) -> str:
if "}" not in tag:
return tag
return tag.split("}")[1]
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 1e4cb41d..8f8efdda 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -1,7 +1,9 @@
import json
import time
+from typing import Any, Dict, List, Optional
from confluent_kafka import Consumer, KafkaException, Producer
+from fatcat_openapi_client import ApiClient, ReleaseEntity
from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch
@@ -14,12 +16,19 @@ class ChangelogWorker(FatcatWorker):
found, fetch them and push (as JSON) into a Kafka topic.
"""
- def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
+ def __init__(
+ self,
+ api: ApiClient,
+ kafka_hosts: str,
+ produce_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ ) -> None:
super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)
self.poll_interval = poll_interval
self.offset = offset # the fatcat changelog offset, not the kafka offset
- def run(self):
+ def run(self) -> None:
# On start, try to consume the most recent from the topic, and using
# that as the starting offset. Note that this is a single-partition
@@ -33,7 +42,7 @@ class ChangelogWorker(FatcatWorker):
self.offset = 0
print("Most recent changelog index in Kafka seems to be {}".format(self.offset))
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
@@ -79,15 +88,15 @@ class EntityUpdatesWorker(FatcatWorker):
def __init__(
self,
- api,
- kafka_hosts,
- consume_topic,
- release_topic,
- file_topic,
- container_topic,
- ingest_file_request_topic,
- work_ident_topic,
- poll_interval=5.0,
+ api: ApiClient,
+ kafka_hosts: str,
+ consume_topic: str,
+ release_topic: str,
+ file_topic: str,
+ container_topic: str,
+ ingest_file_request_topic: str,
+ work_ident_topic: str,
+ poll_interval: float = 5.0,
):
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)
self.release_topic = release_topic
@@ -158,7 +167,7 @@ class EntityUpdatesWorker(FatcatWorker):
"10.17504/",
]
- def want_live_ingest(self, release, ingest_request):
+ def want_live_ingest(self, release: ReleaseEntity, ingest_request: Dict[str, Any]) -> bool:
"""
This function looks at ingest requests and decides whether they are
worth enqueing for ingest.
@@ -259,15 +268,15 @@ class EntityUpdatesWorker(FatcatWorker):
return True
- def run(self):
- def fail_fast(err, msg):
+ def run(self) -> None:
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- def on_commit(err, partitions):
+ def on_commit(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err))
print("Bailing out...")
@@ -284,7 +293,7 @@ class EntityUpdatesWorker(FatcatWorker):
print("Kafka consumer commit successful")
pass
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 989f7f5d..71c4dcf6 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,5 +1,6 @@
import json
import sys
+from typing import Any, Callable, List, Optional
import elasticsearch
import requests
@@ -27,17 +28,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat",
- elasticsearch_release_index="fatcat_releases",
- batch_size=200,
- api_host="https://api.fatcat.wiki/v0",
- query_stats=False,
- ):
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat",
+ elasticsearch_release_index: str = "fatcat_releases",
+ batch_size: int = 200,
+ api_host: str = "https://api.fatcat.wiki/v0",
+ query_stats: bool = False,
+ ) -> None:
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
@@ -46,18 +47,18 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.elasticsearch_index = elasticsearch_index
self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
- self.transform_func = release_to_elasticsearch
+ self.transform_func: Callable = release_to_elasticsearch
self.api_host = api_host
self.query_stats = query_stats
- def run(self):
+ def run(self) -> None:
ac = ApiClient()
api = public_api(self.api_host)
# only used by container indexing query_stats code path
es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
- def fail_fast(err, partitions):
+ def fail_fast(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -73,7 +74,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
# print("Kafka consumer commit successful")
pass
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)
@@ -205,15 +206,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- query_stats=False,
- elasticsearch_release_index="fatcat_release",
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat",
- batch_size=200,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ query_stats: bool = False,
+ elasticsearch_release_index: str = "fatcat_release",
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat",
+ batch_size: int = 200,
):
super().__init__(
kafka_hosts=kafka_hosts,
@@ -242,13 +243,13 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
def __init__(
self,
- kafka_hosts,
- consume_topic,
- poll_interval=10.0,
- offset=None,
- elasticsearch_backend="http://localhost:9200",
- elasticsearch_index="fatcat_changelog",
- batch_size=200,
+ kafka_hosts: str,
+ consume_topic: str,
+ poll_interval: float = 10.0,
+ offset: Optional[int] = None,
+ elasticsearch_backend: str = "http://localhost:9200",
+ elasticsearch_index: str = "fatcat_changelog",
+ batch_size: int = 200,
):
super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index baec44f4..5239465b 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -1,7 +1,10 @@
-from confluent_kafka import Consumer, KafkaException, TopicPartition
+from typing import Any, Dict, Optional
+from confluent_kafka import Consumer, KafkaException, Message, TopicPartition
+from fatcat_openapi_client import ApiClient
-def most_recent_message(topic, kafka_config):
+
+def most_recent_message(topic: str, kafka_config: Dict[str, Any]) -> Message:
"""
Tries to fetch the most recent message from a given topic.
@@ -50,7 +53,13 @@ class FatcatWorker:
Common code for for Kafka producers and consumers.
"""
- def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ produce_topic: Optional[str] = None,
+ consume_topic: Optional[str] = None,
+ api: Optional[ApiClient] = None,
+ ) -> None:
if api:
self.api = api
self.kafka_config = {