diff options
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 60 |
1 files changed, 31 insertions, 29 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index dd48e256..23b300ba 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -1,6 +1,8 @@ +import datetime import json import sys import time +from typing import Any, Dict, List, Optional from urllib.parse import parse_qs, urlparse from confluent_kafka import KafkaException, Producer @@ -60,14 +62,14 @@ class HarvestCrossrefWorker: def __init__( self, - kafka_hosts, - produce_topic, - state_topic, - contact_email, - api_host_url="https://api.crossref.org/works", - start_date=None, - end_date=None, - ): + kafka_hosts: str, + produce_topic: str, + state_topic: str, + contact_email: str, + api_host_url: str = "https://api.crossref.org/works", + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + ) -> None: self.api_host_url = api_host_url self.produce_topic = produce_topic @@ -86,8 +88,8 @@ class HarvestCrossrefWorker: self.name = "Crossref" self.producer = self._kafka_producer() - def _kafka_producer(self): - def fail_fast(err, msg): + def _kafka_producer(self) -> Producer: + def fail_fast(err: Any, _msg: Any) -> None: if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) @@ -107,7 +109,7 @@ class HarvestCrossrefWorker: ) return Producer(producer_conf) - def params(self, date_str): + def params(self, date_str: str) -> Dict[str, Any]: filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str) return { "filter": filter_param, @@ -115,14 +117,14 @@ class HarvestCrossrefWorker: "cursor": "*", } - def update_params(self, params, resp): + def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]: params["cursor"] = resp["message"]["next-cursor"] return params - def extract_key(self, obj): + def extract_key(self, obj: Dict[str, Any]) -> bytes: return obj["DOI"].encode("utf-8") - def fetch_date(self, date): + def fetch_date(self, date: datetime.date) -> None: date_str = date.isoformat() params = self.params(date_str) @@ -182,13 +184,13 @@ class HarvestCrossrefWorker: params = self.update_params(params, resp) self.producer.flush() - def extract_items(self, resp): + def extract_items(self, resp: Dict[str, Any]) -> List[Dict]: return resp["message"]["items"] - def extract_total(self, resp): + def extract_total(self, resp: Dict[str, Any]) -> int: return resp["message"]["total-results"] - def run(self, continuous=False): + def run(self, continuous: bool = False) -> None: while True: current = self.state.next_span(continuous) @@ -222,13 +224,13 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): def __init__( self, - kafka_hosts, - produce_topic, - state_topic, - contact_email, - api_host_url="https://api.datacite.org/dois", - start_date=None, - end_date=None, + kafka_hosts: str, + produce_topic: str, + state_topic: str, + contact_email: str, + api_host_url: str = "https://api.datacite.org/dois", + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, ): super().__init__( kafka_hosts=kafka_hosts, @@ -243,7 +245,7 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): # for datecite, it's "from-update-date" self.name = "Datacite" - def params(self, date_str): + def params(self, date_str: str) -> Dict[str, Any]: """ Dates have to be supplied in 2018-10-27T22:36:30.000Z format. """ @@ -255,16 +257,16 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): "page[cursor]": 1, } - def extract_items(self, resp): + def extract_items(self, resp: Dict[str, Any]) -> List[Dict]: return resp["data"] - def extract_total(self, resp): + def extract_total(self, resp: Dict[str, Any]) -> int: return resp["meta"]["total"] - def extract_key(self, obj): + def extract_key(self, obj: Dict[str, Any]) -> bytes: return obj["attributes"]["doi"].encode("utf-8") - def update_params(self, params, resp): + def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]: """ Using cursor mechanism (https://support.datacite.org/docs/pagination#section-cursor). |