aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/pubmed.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/pubmed.py')
-rw-r--r--python/fatcat_tools/harvest/pubmed.py38
1 files changed, 26 insertions, 12 deletions
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py
index 0f33f334..a1b4da0e 100644
--- a/python/fatcat_tools/harvest/pubmed.py
+++ b/python/fatcat_tools/harvest/pubmed.py
@@ -9,6 +9,7 @@ Assumptions:
"""
import collections
+import datetime
import ftplib
import gzip
import io
@@ -22,6 +23,7 @@ import tempfile
import time
import xml.etree.ElementTree as ET
import zlib
+from typing import Any, Dict, Generator, Optional
from urllib.parse import urlparse
import dateparser
@@ -61,7 +63,14 @@ class PubmedFTPWorker:
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
+ def __init__(
+ self,
+ kafka_hosts: str,
+ produce_topic: str,
+ state_topic: str,
+ start_date: Optional[datetime.date] = None,
+ end_date: Optional[datetime.date] = None,
+ ):
self.name = "Pubmed"
self.host = "ftp.ncbi.nlm.nih.gov"
self.produce_topic = produce_topic
@@ -74,10 +83,10 @@ class PubmedFTPWorker:
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
self.producer = self._kafka_producer()
- self.date_file_map = None
+ self.date_file_map: Optional[Dict[str, Any]] = None
- def _kafka_producer(self):
- def fail_fast(err, msg):
+ def _kafka_producer(self) -> Producer:
+ def fail_fast(err: Any, _msg: None) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -97,7 +106,7 @@ class PubmedFTPWorker:
)
return Producer(producer_conf)
- def fetch_date(self, date):
+ def fetch_date(self, date: datetime.date) -> bool:
"""
Fetch file or files for a given date and feed Kafka one article per
message. If the fetched XML does not contain a PMID an exception is
@@ -163,7 +172,7 @@ class PubmedFTPWorker:
return True
- def run(self, continuous=False):
+ def run(self, continuous: bool = False) -> None:
while True:
self.date_file_map = generate_date_file_map(host=self.host)
if len(self.date_file_map) == 0:
@@ -188,7 +197,7 @@ class PubmedFTPWorker:
print("{} FTP ingest caught up".format(self.name))
-def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):
+def generate_date_file_map(host: str = "ftp.ncbi.nlm.nih.gov") -> Dict[str, Any]:
"""
Generate a DefaultDict[string, set] mapping dates to absolute filepaths on
the server (mostly we have one file, but sometimes more).
@@ -259,7 +268,9 @@ def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):
return mapping
-def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
+def ftpretr(
+ url: str, max_retries: int = 10, retry_delay: int = 1, proxy_hostport: Optional[str] = None
+) -> str:
"""
Note: This might move into a generic place in the future.
@@ -305,8 +316,11 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
def ftpretr_via_http_proxy(
- url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1
-):
+ url: str,
+ proxy_hostport: str = "ftp.ncbi.nlm.nih.gov",
+ max_retries: int = 10,
+ retry_delay: int = 1,
+) -> str:
"""
Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would
be retrievable via proxy.com/a/b/c; (in 09/2021 we used
@@ -335,7 +349,7 @@ def ftpretr_via_http_proxy(
time.sleep(retry_delay)
-def xmlstream(filename, tag, encoding="utf-8"):
+def xmlstream(filename: str, tag: str, encoding: str = "utf-8") -> Generator[Any, Any, Any]:
"""
Note: This might move into a generic place in the future.
@@ -348,7 +362,7 @@ def xmlstream(filename, tag, encoding="utf-8"):
Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities
"""
- def strip_ns(tag):
+ def strip_ns(tag: str) -> str:
if "}" not in tag:
return tag
return tag.split("}")[1]