diff options
Diffstat (limited to 'python/fatcat_tools/harvest')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 60 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 45 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 17 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 38 |
4 files changed, 102 insertions, 58 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index dd48e256..23b300ba 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -1,6 +1,8 @@ +import datetime import json import sys import time +from typing import Any, Dict, List, Optional from urllib.parse import parse_qs, urlparse from confluent_kafka import KafkaException, Producer @@ -60,14 +62,14 @@ class HarvestCrossrefWorker: def __init__( self, - kafka_hosts, - produce_topic, - state_topic, - contact_email, - api_host_url="https://api.crossref.org/works", - start_date=None, - end_date=None, - ): + kafka_hosts: str, + produce_topic: str, + state_topic: str, + contact_email: str, + api_host_url: str = "https://api.crossref.org/works", + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + ) -> None: self.api_host_url = api_host_url self.produce_topic = produce_topic @@ -86,8 +88,8 @@ class HarvestCrossrefWorker: self.name = "Crossref" self.producer = self._kafka_producer() - def _kafka_producer(self): - def fail_fast(err, msg): + def _kafka_producer(self) -> Producer: + def fail_fast(err: Any, _msg: Any) -> None: if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) @@ -107,7 +109,7 @@ class HarvestCrossrefWorker: ) return Producer(producer_conf) - def params(self, date_str): + def params(self, date_str: str) -> Dict[str, Any]: filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str) return { "filter": filter_param, @@ -115,14 +117,14 @@ class HarvestCrossrefWorker: "cursor": "*", } - def update_params(self, params, resp): + def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]: params["cursor"] = resp["message"]["next-cursor"] return params - def extract_key(self, obj): + def extract_key(self, obj: Dict[str, Any]) -> bytes: return obj["DOI"].encode("utf-8") - def fetch_date(self, date): + def fetch_date(self, date: datetime.date) -> None: date_str = date.isoformat() params = self.params(date_str) @@ -182,13 +184,13 @@ class HarvestCrossrefWorker: params = self.update_params(params, resp) self.producer.flush() - def extract_items(self, resp): + def extract_items(self, resp: Dict[str, Any]) -> List[Dict]: return resp["message"]["items"] - def extract_total(self, resp): + def extract_total(self, resp: Dict[str, Any]) -> int: return resp["message"]["total-results"] - def run(self, continuous=False): + def run(self, continuous: bool = False) -> None: while True: current = self.state.next_span(continuous) @@ -222,13 +224,13 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): def __init__( self, - kafka_hosts, - produce_topic, - state_topic, - contact_email, - api_host_url="https://api.datacite.org/dois", - start_date=None, - end_date=None, + kafka_hosts: str, + produce_topic: str, + state_topic: str, + contact_email: str, + api_host_url: str = "https://api.datacite.org/dois", + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, ): super().__init__( kafka_hosts=kafka_hosts, @@ -243,7 +245,7 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): # for datecite, it's "from-update-date" self.name = "Datacite" - def params(self, date_str): + def params(self, date_str: str) -> Dict[str, Any]: """ Dates have to be supplied in 2018-10-27T22:36:30.000Z format. """ @@ -255,16 +257,16 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): "page[cursor]": 1, } - def extract_items(self, resp): + def extract_items(self, resp: Dict[str, Any]) -> List[Dict]: return resp["data"] - def extract_total(self, resp): + def extract_total(self, resp: Dict[str, Any]) -> int: return resp["meta"]["total"] - def extract_key(self, obj): + def extract_key(self, obj: Dict[str, Any]) -> bytes: return obj["attributes"]["doi"].encode("utf-8") - def update_params(self, params, resp): + def update_params(self, params: Dict[str, Any], resp: Dict[str, Any]) -> Dict[str, Any]: """ Using cursor mechanism (https://support.datacite.org/docs/pagination#section-cursor). diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index fda0dc62..4004600b 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -1,6 +1,7 @@ import datetime import json import sys +from typing import Any, Dict, Optional, Sequence, Set import requests from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition @@ -15,8 +16,11 @@ DATE_FMT = "%Y-%m-%d" def requests_retry_session( - retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None -): + retries: int = 10, + backoff_factor: int = 3, + status_forcelist: Sequence[int] = (500, 502, 504), + session: requests.Session = None, +) -> requests.Session: """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -51,19 +55,29 @@ class HarvestState: NOTE: should this class manage the state topic as well? Hrm. """ - def __init__(self, start_date=None, end_date=None, catchup_days=14): - self.to_process = set() - self.completed = set() + def __init__( + self, + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + catchup_days: int = 14, + ): + self.to_process: Set[datetime.date] = set() + self.completed: Set[datetime.date] = set() if catchup_days or start_date or end_date: self.enqueue_period(start_date, end_date, catchup_days) - def __str__(self): + def __str__(self) -> str: return "<HarvestState to_process={}, completed={}>".format( len(self.to_process), len(self.completed) ) - def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): + def enqueue_period( + self, + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + catchup_days: int = 14, + ) -> None: """ This function adds a time period to the "TODO" list, unless the dates have already been processed. @@ -85,7 +99,7 @@ class HarvestState: self.to_process.add(current) current += datetime.timedelta(days=1) - def next_span(self, continuous=False): + def next_span(self, continuous: bool = False) -> Optional[datetime.date]: """ Gets next timespan (date) to be processed, or returns None if completed. @@ -102,7 +116,7 @@ class HarvestState: return None return sorted(list(self.to_process))[0] - def update(self, state_json): + def update(self, state_json: str) -> None: """ Merges a state JSON object into the current state. @@ -114,7 +128,12 @@ class HarvestState: date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date() self.complete(date) - def complete(self, date, kafka_topic=None, kafka_config=None): + def complete( + self, + date: datetime.date, + kafka_topic: Optional[str] = None, + kafka_config: Optional[Dict] = None, + ) -> bytes: """ Records that a date has been processed successfully. @@ -137,7 +156,7 @@ class HarvestState: if kafka_topic: assert kafka_config - def fail_fast(err, msg): + def fail_fast(err: Any, _msg: Any) -> None: if err: raise KafkaException(err) @@ -156,7 +175,7 @@ class HarvestState: producer.flush() return state_json - def initialize_from_kafka(self, kafka_topic, kafka_config): + def initialize_from_kafka(self, kafka_topic: str, kafka_config: Dict[str, Any]) -> None: """ kafka_topic should have type str @@ -167,7 +186,7 @@ class HarvestState: print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr) - def fail_fast(err, msg): + def fail_fast(err: Any, _msg: Any) -> None: if err: raise KafkaException(err) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 40d1c853..19eb6897 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -1,5 +1,7 @@ +import datetime import sys import time +from typing import Any, Optional import sickle from confluent_kafka import KafkaException, Producer @@ -24,7 +26,14 @@ class HarvestOaiPmhWorker: would want something similar operationally. Oh well! """ - def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): + def __init__( + self, + kafka_hosts: str, + produce_topic: str, + state_topic: str, + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + ): self.produce_topic = produce_topic self.state_topic = state_topic @@ -42,8 +51,8 @@ class HarvestOaiPmhWorker: self.state.initialize_from_kafka(self.state_topic, self.kafka_config) print(self.state, file=sys.stderr) - def fetch_date(self, date): - def fail_fast(err, msg): + def fetch_date(self, date: datetime.date) -> None: + def fail_fast(err: Any, _msg: Any) -> None: if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) @@ -93,7 +102,7 @@ class HarvestOaiPmhWorker: ) producer.flush() - def run(self, continuous=False): + def run(self, continuous: bool = False) -> None: while True: current = self.state.next_span(continuous) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 0f33f334..a1b4da0e 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -9,6 +9,7 @@ Assumptions: """ import collections +import datetime import ftplib import gzip import io @@ -22,6 +23,7 @@ import tempfile import time import xml.etree.ElementTree as ET import zlib +from typing import Any, Dict, Generator, Optional from urllib.parse import urlparse import dateparser @@ -61,7 +63,14 @@ class PubmedFTPWorker: """ - def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): + def __init__( + self, + kafka_hosts: str, + produce_topic: str, + state_topic: str, + start_date: Optional[datetime.date] = None, + end_date: Optional[datetime.date] = None, + ): self.name = "Pubmed" self.host = "ftp.ncbi.nlm.nih.gov" self.produce_topic = produce_topic @@ -74,10 +83,10 @@ class PubmedFTPWorker: self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) self.producer = self._kafka_producer() - self.date_file_map = None + self.date_file_map: Optional[Dict[str, Any]] = None - def _kafka_producer(self): - def fail_fast(err, msg): + def _kafka_producer(self) -> Producer: + def fail_fast(err: Any, _msg: None) -> None: if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) @@ -97,7 +106,7 @@ class PubmedFTPWorker: ) return Producer(producer_conf) - def fetch_date(self, date): + def fetch_date(self, date: datetime.date) -> bool: """ Fetch file or files for a given date and feed Kafka one article per message. If the fetched XML does not contain a PMID an exception is @@ -163,7 +172,7 @@ class PubmedFTPWorker: return True - def run(self, continuous=False): + def run(self, continuous: bool = False) -> None: while True: self.date_file_map = generate_date_file_map(host=self.host) if len(self.date_file_map) == 0: @@ -188,7 +197,7 @@ class PubmedFTPWorker: print("{} FTP ingest caught up".format(self.name)) -def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"): +def generate_date_file_map(host: str = "ftp.ncbi.nlm.nih.gov") -> Dict[str, Any]: """ Generate a DefaultDict[string, set] mapping dates to absolute filepaths on the server (mostly we have one file, but sometimes more). @@ -259,7 +268,9 @@ def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"): return mapping -def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): +def ftpretr( + url: str, max_retries: int = 10, retry_delay: int = 1, proxy_hostport: Optional[str] = None +) -> str: """ Note: This might move into a generic place in the future. @@ -305,8 +316,11 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): def ftpretr_via_http_proxy( - url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1 -): + url: str, + proxy_hostport: str = "ftp.ncbi.nlm.nih.gov", + max_retries: int = 10, + retry_delay: int = 1, +) -> str: """ Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would be retrievable via proxy.com/a/b/c; (in 09/2021 we used @@ -335,7 +349,7 @@ def ftpretr_via_http_proxy( time.sleep(retry_delay) -def xmlstream(filename, tag, encoding="utf-8"): +def xmlstream(filename: str, tag: str, encoding: str = "utf-8") -> Generator[Any, Any, Any]: """ Note: This might move into a generic place in the future. @@ -348,7 +362,7 @@ def xmlstream(filename, tag, encoding="utf-8"): Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities """ - def strip_ns(tag): + def strip_ns(tag: str) -> str: if "}" not in tag: return tag return tag.split("}")[1] |