summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/doi_registrars.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py60
1 files changed, 31 insertions, 29 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index dd48e256..23b300ba 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -1,6 +1,8 @@
+import datetime
import json
import sys
import time
+from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, urlparse
from confluent_kafka import KafkaException, Producer
@@ -60,14 +62,14 @@ class HarvestCrossrefWorker:
def __init__(
self,
- kafka_hosts,
- produce_topic,
- state_topic,
- contact_email,
- api_host_url="https://api.crossref.org/works",
- start_date=None,
- end_date=None,
- ):
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ contact_email: str,
+ api_host_url: str = "https://api.crossref.org/works",
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ ) -> None:
self.api_host_url = api_host_url
self.produce_topic = produce_topic
@@ -86,8 +88,8 @@ class HarvestCrossrefWorker:
self.name = "Crossref"
self.producer = self._kafka_producer()
- def _kafka_producer(self):
- def fail_fast(err, msg):
+ def _kafka_producer(self) -> Producer:
+ def fail_fast(err: Any, _msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -107,7 +109,7 @@ class HarvestCrossrefWorker:
)
return Producer(producer_conf)
- def params(self, date_str):
+ def params(self, date_str: str) -> Dict[str, Any]:
filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str)
return {
"filter": filter_param,
@@ -115,14 +117,14 @@ class HarvestCrossrefWorker:
"cursor": "*",
}
- def update_params(self, params, resp):
+ def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]:
params["cursor"] = resp["message"]["next-cursor"]
return params
- def extract_key(self, obj):
+ def extract_key(self, obj: Dict[str, Any]) -> bytes:
return obj["DOI"].encode("utf-8")
- def fetch_date(self, date):
+ def fetch_date(self, date: datetime.date) -> None:
date_str = date.isoformat()
params = self.params(date_str)
@@ -182,13 +184,13 @@ class HarvestCrossrefWorker:
params = self.update_params(params, resp)
self.producer.flush()
- def extract_items(self, resp):
+ def extract_items(self, resp: Dict[str, Any]) -> List[Dict]:
return resp["message"]["items"]
- def extract_total(self, resp):
+ def extract_total(self, resp: Dict[str, Any]) -> int:
return resp["message"]["total-results"]
- def run(self, continuous=False):
+ def run(self, continuous: bool = False) -> None:
while True:
current = self.state.next_span(continuous)
@@ -222,13 +224,13 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
def __init__(
self,
- kafka_hosts,
- produce_topic,
- state_topic,
- contact_email,
- api_host_url="https://api.datacite.org/dois",
- start_date=None,
- end_date=None,
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ contact_email: str,
+ api_host_url: str = "https://api.datacite.org/dois",
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
):
super().__init__(
kafka_hosts=kafka_hosts,
@@ -243,7 +245,7 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
# for datecite, it's "from-update-date"
self.name = "Datacite"
- def params(self, date_str):
+ def params(self, date_str: str) -> Dict[str, Any]:
"""
Dates have to be supplied in 2018-10-27T22:36:30.000Z format.
"""
@@ -255,16 +257,16 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
"page[cursor]": 1,
}
- def extract_items(self, resp):
+ def extract_items(self, resp: Dict[str, Any]) -> List[Dict]:
return resp["data"]
- def extract_total(self, resp):
+ def extract_total(self, resp: Dict[str, Any]) -> int:
return resp["meta"]["total"]
- def extract_key(self, obj):
+ def extract_key(self, obj: Dict[str, Any]) -> bytes:
return obj["attributes"]["doi"].encode("utf-8")
- def update_params(self, params, resp):
+ def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]:
"""
Using cursor mechanism (https://support.datacite.org/docs/pagination#section-cursor).