summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/harvest_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py45
1 files changed, 32 insertions, 13 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index fda0dc62..4004600b 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -1,6 +1,7 @@
import datetime
import json
import sys
+from typing import Any, Dict, Optional, Sequence, Set
import requests
from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition
@@ -15,8 +16,11 @@ DATE_FMT = "%Y-%m-%d"
def requests_retry_session(
- retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None
-):
+ retries: int = 10,
+ backoff_factor: int = 3,
+ status_forcelist: Sequence[int] = (500, 502, 504),
+ session: requests.Session = None,
+) -> requests.Session:
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -51,19 +55,29 @@ class HarvestState:
NOTE: should this class manage the state topic as well? Hrm.
"""
- def __init__(self, start_date=None, end_date=None, catchup_days=14):
- self.to_process = set()
- self.completed = set()
+ def __init__(
+ self,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ catchup_days: int = 14,
+ ):
+ self.to_process: Set[datetime.date] = set()
+ self.completed: Set[datetime.date] = set()
if catchup_days or start_date or end_date:
self.enqueue_period(start_date, end_date, catchup_days)
- def __str__(self):
+ def __str__(self) -> str:
return "<HarvestState to_process={}, completed={}>".format(
len(self.to_process), len(self.completed)
)
- def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
+ def enqueue_period(
+ self,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ catchup_days: int = 14,
+ ) -> None:
"""
This function adds a time period to the "TODO" list, unless the dates
have already been processed.
@@ -85,7 +99,7 @@ class HarvestState:
self.to_process.add(current)
current += datetime.timedelta(days=1)
- def next_span(self, continuous=False):
+ def next_span(self, continuous: bool = False) -> Optional[datetime.date]:
"""
Gets next timespan (date) to be processed, or returns None if completed.
@@ -102,7 +116,7 @@ class HarvestState:
return None
return sorted(list(self.to_process))[0]
- def update(self, state_json):
+ def update(self, state_json: str) -> None:
"""
Merges a state JSON object into the current state.
@@ -114,7 +128,12 @@ class HarvestState:
date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()
self.complete(date)
- def complete(self, date, kafka_topic=None, kafka_config=None):
+ def complete(
+ self,
+ date: datetime.date,
+ kafka_topic: Optional[str] = None,
+ kafka_config: Optional[Dict] = None,
+ ) -> bytes:
"""
Records that a date has been processed successfully.
@@ -137,7 +156,7 @@ class HarvestState:
if kafka_topic:
assert kafka_config
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err:
raise KafkaException(err)
@@ -156,7 +175,7 @@ class HarvestState:
producer.flush()
return state_json
- def initialize_from_kafka(self, kafka_topic, kafka_config):
+ def initialize_from_kafka(self, kafka_topic: str, kafka_config: Dict[str, Any]) -> None:
"""
kafka_topic should have type str
@@ -167,7 +186,7 @@ class HarvestState:
print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr)
- def fail_fast(err, msg):
+ def fail_fast(err: Any, _msg: Any) -> None:
if err:
raise KafkaException(err)