diff options
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 45 |
1 files changed, 32 insertions, 13 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index fda0dc62..4004600b 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -1,6 +1,7 @@ import datetime import json import sys +from typing import Any, Dict, Optional, Sequence, Set import requests from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition @@ -15,8 +16,11 @@ DATE_FMT = "%Y-%m-%d" def requests_retry_session( - retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None -): + retries: int = 10, + backoff_factor: int = 3, + status_forcelist: Sequence[int] = (500, 502, 504), + session: requests.Session = None, +) -> requests.Session: """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -51,19 +55,29 @@ class HarvestState: NOTE: should this class manage the state topic as well? Hrm. """ - def __init__(self, start_date=None, end_date=None, catchup_days=14): - self.to_process = set() - self.completed = set() + def __init__( + self, + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + catchup_days: int = 14, + ): + self.to_process: Set[datetime.date] = set() + self.completed: Set[datetime.date] = set() if catchup_days or start_date or end_date: self.enqueue_period(start_date, end_date, catchup_days) - def __str__(self): + def __str__(self) -> str: return "<HarvestState to_process={}, completed={}>".format( len(self.to_process), len(self.completed) ) - def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): + def enqueue_period( + self, + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + catchup_days: int = 14, + ) -> None: """ This function adds a time period to the "TODO" list, unless the dates have already been processed. @@ -85,7 +99,7 @@ class HarvestState: self.to_process.add(current) current += datetime.timedelta(days=1) - def next_span(self, continuous=False): + def next_span(self, continuous: bool = False) -> Optional[datetime.date]: """ Gets next timespan (date) to be processed, or returns None if completed. @@ -102,7 +116,7 @@ class HarvestState: return None return sorted(list(self.to_process))[0] - def update(self, state_json): + def update(self, state_json: str) -> None: """ Merges a state JSON object into the current state. @@ -114,7 +128,12 @@ class HarvestState: date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date() self.complete(date) - def complete(self, date, kafka_topic=None, kafka_config=None): + def complete( + self, + date: datetime.date, + kafka_topic: Optional[str] = None, + kafka_config: Optional[Dict] = None, + ) -> bytes: """ Records that a date has been processed successfully. @@ -137,7 +156,7 @@ class HarvestState: if kafka_topic: assert kafka_config - def fail_fast(err, msg): + def fail_fast(err: Any, _msg: Any) -> None: if err: raise KafkaException(err) @@ -156,7 +175,7 @@ class HarvestState: producer.flush() return state_json - def initialize_from_kafka(self, kafka_topic, kafka_config): + def initialize_from_kafka(self, kafka_topic: str, kafka_config: Dict[str, Any]) -> None: """ kafka_topic should have type str @@ -167,7 +186,7 @@ class HarvestState: print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr) - def fail_fast(err, msg): + def fail_fast(err: Any, _msg: Any) -> None: if err: raise KafkaException(err) |