diff options
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/cleanups/common.py | 19 | ||||
| -rw-r--r-- | python/fatcat_tools/cleanups/files.py | 8 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 60 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 45 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 17 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 38 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 43 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 63 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 15 | 
9 files changed, 186 insertions, 122 deletions
| diff --git a/python/fatcat_tools/cleanups/common.py b/python/fatcat_tools/cleanups/common.py index 26ca7bd6..7ebfc8a0 100644 --- a/python/fatcat_tools/cleanups/common.py +++ b/python/fatcat_tools/cleanups/common.py @@ -2,6 +2,7 @@ import copy  import json  import subprocess  from collections import Counter +from typing import Any, Dict, List  from fatcat_openapi_client import ApiClient, Editgroup @@ -27,7 +28,7 @@ class EntityCleaner:      This class is pretty similar to EntityImporter, but isn't subclassed.      """ -    def __init__(self, api, entity_type, **kwargs): +    def __init__(self, api: ApiClient, entity_type: Any, **kwargs) -> None:          eg_extra = kwargs.get("editgroup_extra", dict())          eg_extra["git_rev"] = eg_extra.get( @@ -49,14 +50,14 @@ class EntityCleaner:          if self.dry_run_mode:              print("Running in dry-run mode!") -    def reset(self): +    def reset(self) -> None:          self.counts = Counter({"lines": 0, "cleaned": 0, "updated": 0})          self._edit_count = 0          self._editgroup_id = None -        self._entity_queue = [] -        self._idents_inflight = [] +        self._entity_queue: List[Any] = [] +        self._idents_inflight: List[str] = [] -    def push_record(self, record): +    def push_record(self, record: Dict[str, Any]) -> None:          """          Intended to be called by "pusher" class (which could be pulling from          JSON file, Kafka, whatever). @@ -106,14 +107,14 @@ class EntityCleaner:              self._idents_inflight = []          return -    def clean_entity(self, entity): +    def clean_entity(self, entity: Any) -> Any:          """          Mutates entity in-place and returns it          """          # implementations should fill this in          raise NotImplementedError -    def try_update(self, entity): +    def try_update(self, entity: Any) -> int:          """          Returns edit count (number of entities updated). @@ -123,7 +124,7 @@ class EntityCleaner:          # implementations should fill this in          raise NotImplementedError -    def finish(self): +    def finish(self) -> Counter:          if self._edit_count > 0:              self.api.accept_editgroup(self._editgroup_id)              self._editgroup_id = None @@ -132,7 +133,7 @@ class EntityCleaner:          return self.counts -    def get_editgroup_id(self): +    def get_editgroup_id(self) -> str:          if not self._editgroup_id:              eg = self.api.create_editgroup( diff --git a/python/fatcat_tools/cleanups/files.py b/python/fatcat_tools/cleanups/files.py index d378a91f..309924e3 100644 --- a/python/fatcat_tools/cleanups/files.py +++ b/python/fatcat_tools/cleanups/files.py @@ -1,4 +1,4 @@ -from fatcat_openapi_client.models import FileEntity +from fatcat_openapi_client import ApiClient, FileEntity  from fatcat_openapi_client.rest import ApiException  from .common import EntityCleaner @@ -9,7 +9,7 @@ class FileCleaner(EntityCleaner):      File fixups!      """ -    def __init__(self, api, **kwargs): +    def __init__(self, api: ApiClient, **kwargs) -> None:          eg_desc = (              kwargs.pop("editgroup_description", None) @@ -25,7 +25,7 @@ class FileCleaner(EntityCleaner):              **kwargs          ) -    def clean_entity(self, entity): +    def clean_entity(self, entity: FileEntity) -> FileEntity:          """          TODO: mimetype is bogus like (???) => clean mimetype          """ @@ -54,7 +54,7 @@ class FileCleaner(EntityCleaner):          return entity -    def try_update(self, entity): +    def try_update(self, entity: FileEntity) -> int:          try:              existing = self.api.get_file(entity.ident) diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index dd48e256..23b300ba 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -1,6 +1,8 @@ +import datetime  import json  import sys  import time +from typing import Any, Dict, List, Optional  from urllib.parse import parse_qs, urlparse  from confluent_kafka import KafkaException, Producer @@ -60,14 +62,14 @@ class HarvestCrossrefWorker:      def __init__(          self, -        kafka_hosts, -        produce_topic, -        state_topic, -        contact_email, -        api_host_url="https://api.crossref.org/works", -        start_date=None, -        end_date=None, -    ): +        kafka_hosts: str, +        produce_topic: str, +        state_topic: str, +        contact_email: str, +        api_host_url: str = "https://api.crossref.org/works", +        start_date: Optional[datetime.date] = None, +        end_date: Optional[datetime.date] = None, +    ) -> None:          self.api_host_url = api_host_url          self.produce_topic = produce_topic @@ -86,8 +88,8 @@ class HarvestCrossrefWorker:          self.name = "Crossref"          self.producer = self._kafka_producer() -    def _kafka_producer(self): -        def fail_fast(err, msg): +    def _kafka_producer(self) -> Producer: +        def fail_fast(err: Any, _msg: Any) -> None:              if err is not None:                  print("Kafka producer delivery error: {}".format(err), file=sys.stderr)                  print("Bailing out...", file=sys.stderr) @@ -107,7 +109,7 @@ class HarvestCrossrefWorker:          )          return Producer(producer_conf) -    def params(self, date_str): +    def params(self, date_str: str) -> Dict[str, Any]:          filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str)          return {              "filter": filter_param, @@ -115,14 +117,14 @@ class HarvestCrossrefWorker:              "cursor": "*",          } -    def update_params(self, params, resp): +    def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]:          params["cursor"] = resp["message"]["next-cursor"]          return params -    def extract_key(self, obj): +    def extract_key(self, obj: Dict[str, Any]) -> bytes:          return obj["DOI"].encode("utf-8") -    def fetch_date(self, date): +    def fetch_date(self, date: datetime.date) -> None:          date_str = date.isoformat()          params = self.params(date_str) @@ -182,13 +184,13 @@ class HarvestCrossrefWorker:              params = self.update_params(params, resp)          self.producer.flush() -    def extract_items(self, resp): +    def extract_items(self, resp: Dict[str, Any]) -> List[Dict]:          return resp["message"]["items"] -    def extract_total(self, resp): +    def extract_total(self, resp: Dict[str, Any]) -> int:          return resp["message"]["total-results"] -    def run(self, continuous=False): +    def run(self, continuous: bool = False) -> None:          while True:              current = self.state.next_span(continuous) @@ -222,13 +224,13 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):      def __init__(          self, -        kafka_hosts, -        produce_topic, -        state_topic, -        contact_email, -        api_host_url="https://api.datacite.org/dois", -        start_date=None, -        end_date=None, +        kafka_hosts: str, +        produce_topic: str, +        state_topic: str, +        contact_email: str, +        api_host_url: str = "https://api.datacite.org/dois", +        start_date: Optional[datetime.date] = None, +        end_date: Optional[datetime.date] = None,      ):          super().__init__(              kafka_hosts=kafka_hosts, @@ -243,7 +245,7 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):          # for datecite, it's "from-update-date"          self.name = "Datacite" -    def params(self, date_str): +    def params(self, date_str: str) -> Dict[str, Any]:          """          Dates have to be supplied in 2018-10-27T22:36:30.000Z format.          """ @@ -255,16 +257,16 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):              "page[cursor]": 1,          } -    def extract_items(self, resp): +    def extract_items(self, resp: Dict[str, Any]) -> List[Dict]:          return resp["data"] -    def extract_total(self, resp): +    def extract_total(self, resp: Dict[str, Any]) -> int:          return resp["meta"]["total"] -    def extract_key(self, obj): +    def extract_key(self, obj: Dict[str, Any]) -> bytes:          return obj["attributes"]["doi"].encode("utf-8") -    def update_params(self, params, resp): +    def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]:          """          Using cursor mechanism (https://support.datacite.org/docs/pagination#section-cursor). diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index fda0dc62..4004600b 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -1,6 +1,7 @@  import datetime  import json  import sys +from typing import Any, Dict, Optional, Sequence, Set  import requests  from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition @@ -15,8 +16,11 @@ DATE_FMT = "%Y-%m-%d"  def requests_retry_session( -    retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None -): +    retries: int = 10, +    backoff_factor: int = 3, +    status_forcelist: Sequence[int] = (500, 502, 504), +    session: requests.Session = None, +) -> requests.Session:      """      From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests      """ @@ -51,19 +55,29 @@ class HarvestState:      NOTE: should this class manage the state topic as well? Hrm.      """ -    def __init__(self, start_date=None, end_date=None, catchup_days=14): -        self.to_process = set() -        self.completed = set() +    def __init__( +        self, +        start_date: Optional[datetime.date] = None, +        end_date: Optional[datetime.date] = None, +        catchup_days: int = 14, +    ): +        self.to_process: Set[datetime.date] = set() +        self.completed: Set[datetime.date] = set()          if catchup_days or start_date or end_date:              self.enqueue_period(start_date, end_date, catchup_days) -    def __str__(self): +    def __str__(self) -> str:          return "<HarvestState to_process={}, completed={}>".format(              len(self.to_process), len(self.completed)          ) -    def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): +    def enqueue_period( +        self, +        start_date: Optional[datetime.date] = None, +        end_date: Optional[datetime.date] = None, +        catchup_days: int = 14, +    ) -> None:          """          This function adds a time period to the "TODO" list, unless the dates          have already been processed. @@ -85,7 +99,7 @@ class HarvestState:                  self.to_process.add(current)              current += datetime.timedelta(days=1) -    def next_span(self, continuous=False): +    def next_span(self, continuous: bool = False) -> Optional[datetime.date]:          """          Gets next timespan (date) to be processed, or returns None if completed. @@ -102,7 +116,7 @@ class HarvestState:              return None          return sorted(list(self.to_process))[0] -    def update(self, state_json): +    def update(self, state_json: str) -> None:          """          Merges a state JSON object into the current state. @@ -114,7 +128,12 @@ class HarvestState:              date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()              self.complete(date) -    def complete(self, date, kafka_topic=None, kafka_config=None): +    def complete( +        self, +        date: datetime.date, +        kafka_topic: Optional[str] = None, +        kafka_config: Optional[Dict] = None, +    ) -> bytes:          """          Records that a date has been processed successfully. @@ -137,7 +156,7 @@ class HarvestState:          if kafka_topic:              assert kafka_config -            def fail_fast(err, msg): +            def fail_fast(err: Any, _msg: Any) -> None:                  if err:                      raise KafkaException(err) @@ -156,7 +175,7 @@ class HarvestState:              producer.flush()          return state_json -    def initialize_from_kafka(self, kafka_topic, kafka_config): +    def initialize_from_kafka(self, kafka_topic: str, kafka_config: Dict[str, Any]) -> None:          """          kafka_topic should have type str @@ -167,7 +186,7 @@ class HarvestState:          print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr) -        def fail_fast(err, msg): +        def fail_fast(err: Any, _msg: Any) -> None:              if err:                  raise KafkaException(err) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 40d1c853..19eb6897 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -1,5 +1,7 @@ +import datetime  import sys  import time +from typing import Any, Optional  import sickle  from confluent_kafka import KafkaException, Producer @@ -24,7 +26,14 @@ class HarvestOaiPmhWorker:      would want something similar operationally. Oh well!      """ -    def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): +    def __init__( +        self, +        kafka_hosts: str, +        produce_topic: str, +        state_topic: str, +        start_date: Optional[datetime.date] = None, +        end_date: Optional[datetime.date] = None, +    ):          self.produce_topic = produce_topic          self.state_topic = state_topic @@ -42,8 +51,8 @@ class HarvestOaiPmhWorker:          self.state.initialize_from_kafka(self.state_topic, self.kafka_config)          print(self.state, file=sys.stderr) -    def fetch_date(self, date): -        def fail_fast(err, msg): +    def fetch_date(self, date: datetime.date) -> None: +        def fail_fast(err: Any, _msg: Any) -> None:              if err is not None:                  print("Kafka producer delivery error: {}".format(err), file=sys.stderr)                  print("Bailing out...", file=sys.stderr) @@ -93,7 +102,7 @@ class HarvestOaiPmhWorker:              )          producer.flush() -    def run(self, continuous=False): +    def run(self, continuous: bool = False) -> None:          while True:              current = self.state.next_span(continuous) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 0f33f334..a1b4da0e 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -9,6 +9,7 @@ Assumptions:  """  import collections +import datetime  import ftplib  import gzip  import io @@ -22,6 +23,7 @@ import tempfile  import time  import xml.etree.ElementTree as ET  import zlib +from typing import Any, Dict, Generator, Optional  from urllib.parse import urlparse  import dateparser @@ -61,7 +63,14 @@ class PubmedFTPWorker:      """ -    def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): +    def __init__( +        self, +        kafka_hosts: str, +        produce_topic: str, +        state_topic: str, +        start_date: Optional[datetime.date] = None, +        end_date: Optional[datetime.date] = None, +    ):          self.name = "Pubmed"          self.host = "ftp.ncbi.nlm.nih.gov"          self.produce_topic = produce_topic @@ -74,10 +83,10 @@ class PubmedFTPWorker:          self.state = HarvestState(start_date, end_date)          self.state.initialize_from_kafka(self.state_topic, self.kafka_config)          self.producer = self._kafka_producer() -        self.date_file_map = None +        self.date_file_map: Optional[Dict[str, Any]] = None -    def _kafka_producer(self): -        def fail_fast(err, msg): +    def _kafka_producer(self) -> Producer: +        def fail_fast(err: Any, _msg: None) -> None:              if err is not None:                  print("Kafka producer delivery error: {}".format(err), file=sys.stderr)                  print("Bailing out...", file=sys.stderr) @@ -97,7 +106,7 @@ class PubmedFTPWorker:          )          return Producer(producer_conf) -    def fetch_date(self, date): +    def fetch_date(self, date: datetime.date) -> bool:          """          Fetch file or files for a given date and feed Kafka one article per          message. If the fetched XML does not contain a PMID an exception is @@ -163,7 +172,7 @@ class PubmedFTPWorker:          return True -    def run(self, continuous=False): +    def run(self, continuous: bool = False) -> None:          while True:              self.date_file_map = generate_date_file_map(host=self.host)              if len(self.date_file_map) == 0: @@ -188,7 +197,7 @@ class PubmedFTPWorker:          print("{} FTP ingest caught up".format(self.name)) -def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"): +def generate_date_file_map(host: str = "ftp.ncbi.nlm.nih.gov") -> Dict[str, Any]:      """      Generate a DefaultDict[string, set] mapping dates to absolute filepaths on      the server (mostly we have one file, but sometimes more). @@ -259,7 +268,9 @@ def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):      return mapping -def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): +def ftpretr( +    url: str, max_retries: int = 10, retry_delay: int = 1, proxy_hostport: Optional[str] = None +) -> str:      """      Note: This might move into a generic place in the future. @@ -305,8 +316,11 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):  def ftpretr_via_http_proxy( -    url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1 -): +    url: str, +    proxy_hostport: str = "ftp.ncbi.nlm.nih.gov", +    max_retries: int = 10, +    retry_delay: int = 1, +) -> str:      """      Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would      be retrievable via proxy.com/a/b/c; (in 09/2021 we used @@ -335,7 +349,7 @@ def ftpretr_via_http_proxy(              time.sleep(retry_delay) -def xmlstream(filename, tag, encoding="utf-8"): +def xmlstream(filename: str, tag: str, encoding: str = "utf-8") -> Generator[Any, Any, Any]:      """      Note: This might move into a generic place in the future. @@ -348,7 +362,7 @@ def xmlstream(filename, tag, encoding="utf-8"):      Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities      """ -    def strip_ns(tag): +    def strip_ns(tag: str) -> str:          if "}" not in tag:              return tag          return tag.split("}")[1] diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 1e4cb41d..8f8efdda 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,9 @@  import json  import time +from typing import Any, Dict, List, Optional  from confluent_kafka import Consumer, KafkaException, Producer +from fatcat_openapi_client import ApiClient, ReleaseEntity  from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch @@ -14,12 +16,19 @@ class ChangelogWorker(FatcatWorker):      found, fetch them and push (as JSON) into a Kafka topic.      """ -    def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): +    def __init__( +        self, +        api: ApiClient, +        kafka_hosts: str, +        produce_topic: str, +        poll_interval: float = 10.0, +        offset: Optional[int] = None, +    ) -> None:          super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)          self.poll_interval = poll_interval          self.offset = offset  # the fatcat changelog offset, not the kafka offset -    def run(self): +    def run(self) -> None:          # On start, try to consume the most recent from the topic, and using          # that as the starting offset. Note that this is a single-partition @@ -33,7 +42,7 @@ class ChangelogWorker(FatcatWorker):                  self.offset = 0              print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) -        def fail_fast(err, msg): +        def fail_fast(err: Any, _msg: Any) -> None:              if err is not None:                  print("Kafka producer delivery error: {}".format(err))                  print("Bailing out...") @@ -79,15 +88,15 @@ class EntityUpdatesWorker(FatcatWorker):      def __init__(          self, -        api, -        kafka_hosts, -        consume_topic, -        release_topic, -        file_topic, -        container_topic, -        ingest_file_request_topic, -        work_ident_topic, -        poll_interval=5.0, +        api: ApiClient, +        kafka_hosts: str, +        consume_topic: str, +        release_topic: str, +        file_topic: str, +        container_topic: str, +        ingest_file_request_topic: str, +        work_ident_topic: str, +        poll_interval: float = 5.0,      ):          super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)          self.release_topic = release_topic @@ -158,7 +167,7 @@ class EntityUpdatesWorker(FatcatWorker):              "10.17504/",          ] -    def want_live_ingest(self, release, ingest_request): +    def want_live_ingest(self, release: ReleaseEntity, ingest_request: Dict[str, Any]) -> bool:          """          This function looks at ingest requests and decides whether they are          worth enqueing for ingest. @@ -259,15 +268,15 @@ class EntityUpdatesWorker(FatcatWorker):          return True -    def run(self): -        def fail_fast(err, msg): +    def run(self) -> None: +        def fail_fast(err: Any, _msg: Any) -> None:              if err is not None:                  print("Kafka producer delivery error: {}".format(err))                  print("Bailing out...")                  # TODO: should it be sys.exit(-1)?                  raise KafkaException(err) -        def on_commit(err, partitions): +        def on_commit(err: Any, partitions: List[Any]) -> None:              if err is not None:                  print("Kafka consumer commit error: {}".format(err))                  print("Bailing out...") @@ -284,7 +293,7 @@ class EntityUpdatesWorker(FatcatWorker):              print("Kafka consumer commit successful")              pass -        def on_rebalance(consumer, partitions): +        def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:              for p in partitions:                  if p.error:                      raise KafkaException(p.error) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 989f7f5d..71c4dcf6 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,5 +1,6 @@  import json  import sys +from typing import Any, Callable, List, Optional  import elasticsearch  import requests @@ -27,17 +28,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):      def __init__(          self, -        kafka_hosts, -        consume_topic, -        poll_interval=10.0, -        offset=None, -        elasticsearch_backend="http://localhost:9200", -        elasticsearch_index="fatcat", -        elasticsearch_release_index="fatcat_releases", -        batch_size=200, -        api_host="https://api.fatcat.wiki/v0", -        query_stats=False, -    ): +        kafka_hosts: str, +        consume_topic: str, +        poll_interval: float = 10.0, +        offset: Optional[int] = None, +        elasticsearch_backend: str = "http://localhost:9200", +        elasticsearch_index: str = "fatcat", +        elasticsearch_release_index: str = "fatcat_releases", +        batch_size: int = 200, +        api_host: str = "https://api.fatcat.wiki/v0", +        query_stats: bool = False, +    ) -> None:          super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)          self.consumer_group = "elasticsearch-updates3"          self.batch_size = batch_size @@ -46,18 +47,18 @@ class ElasticsearchReleaseWorker(FatcatWorker):          self.elasticsearch_index = elasticsearch_index          self.elasticsearch_release_index = elasticsearch_release_index          self.entity_type = ReleaseEntity -        self.transform_func = release_to_elasticsearch +        self.transform_func: Callable = release_to_elasticsearch          self.api_host = api_host          self.query_stats = query_stats -    def run(self): +    def run(self) -> None:          ac = ApiClient()          api = public_api(self.api_host)          # only used by container indexing query_stats code path          es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) -        def fail_fast(err, partitions): +        def fail_fast(err: Any, partitions: List[Any]) -> None:              if err is not None:                  print("Kafka consumer commit error: {}".format(err), file=sys.stderr)                  print("Bailing out...", file=sys.stderr) @@ -73,7 +74,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):              # print("Kafka consumer commit successful")              pass -        def on_rebalance(consumer, partitions): +        def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:              for p in partitions:                  if p.error:                      raise KafkaException(p.error) @@ -205,15 +206,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):  class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):      def __init__(          self, -        kafka_hosts, -        consume_topic, -        poll_interval=10.0, -        offset=None, -        query_stats=False, -        elasticsearch_release_index="fatcat_release", -        elasticsearch_backend="http://localhost:9200", -        elasticsearch_index="fatcat", -        batch_size=200, +        kafka_hosts: str, +        consume_topic: str, +        poll_interval: float = 10.0, +        offset: Optional[int] = None, +        query_stats: bool = False, +        elasticsearch_release_index: str = "fatcat_release", +        elasticsearch_backend: str = "http://localhost:9200", +        elasticsearch_index: str = "fatcat", +        batch_size: int = 200,      ):          super().__init__(              kafka_hosts=kafka_hosts, @@ -242,13 +243,13 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):      def __init__(          self, -        kafka_hosts, -        consume_topic, -        poll_interval=10.0, -        offset=None, -        elasticsearch_backend="http://localhost:9200", -        elasticsearch_index="fatcat_changelog", -        batch_size=200, +        kafka_hosts: str, +        consume_topic: str, +        poll_interval: float = 10.0, +        offset: Optional[int] = None, +        elasticsearch_backend: str = "http://localhost:9200", +        elasticsearch_index: str = "fatcat_changelog", +        batch_size: int = 200,      ):          super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)          self.consumer_group = "elasticsearch-updates3" diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index baec44f4..5239465b 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -1,7 +1,10 @@ -from confluent_kafka import Consumer, KafkaException, TopicPartition +from typing import Any, Dict, Optional +from confluent_kafka import Consumer, KafkaException, Message, TopicPartition +from fatcat_openapi_client import ApiClient -def most_recent_message(topic, kafka_config): + +def most_recent_message(topic: str, kafka_config: Dict[str, Any]) -> Message:      """      Tries to fetch the most recent message from a given topic. @@ -50,7 +53,13 @@ class FatcatWorker:      Common code for for Kafka producers and consumers.      """ -    def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None): +    def __init__( +        self, +        kafka_hosts: str, +        produce_topic: Optional[str] = None, +        consume_topic: Optional[str] = None, +        api: Optional[ApiClient] = None, +    ) -> None:          if api:              self.api = api          self.kafka_config = { | 
