summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/pubmed.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/pubmed.py')
-rw-r--r--python/fatcat_tools/harvest/pubmed.py159
1 files changed, 106 insertions, 53 deletions
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py
index ee55f4eb..0f33f334 100644
--- a/python/fatcat_tools/harvest/pubmed.py
+++ b/python/fatcat_tools/harvest/pubmed.py
@@ -60,14 +60,15 @@ class PubmedFTPWorker:
<tr>
"""
+
def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
- self.name = 'Pubmed'
- self.host = 'ftp.ncbi.nlm.nih.gov'
+ self.name = "Pubmed"
+ self.host = "ftp.ncbi.nlm.nih.gov"
self.produce_topic = produce_topic
self.state_topic = state_topic
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes
}
self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
self.state = HarvestState(start_date, end_date)
@@ -86,12 +87,14 @@ class PubmedFTPWorker:
self._kafka_fail_fast = fail_fast
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
return Producer(producer_conf)
def fetch_date(self, date):
@@ -105,24 +108,35 @@ class PubmedFTPWorker:
if self.date_file_map is None:
raise ValueError("cannot fetch date without date file mapping")
- date_str = date.strftime('%Y-%m-%d')
+ date_str = date.strftime("%Y-%m-%d")
paths = self.date_file_map.get(date_str)
if paths is None:
- print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr)
+ print(
+ "WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(
+ date_str, self.date_file_map
+ ),
+ file=sys.stderr,
+ )
return False
count = 0
for path in paths:
# Fetch and decompress file.
url = "ftp://{}{}".format(self.host, path)
- filename = ftpretr(url, proxy_hostport="159.69.240.245:15201") # TODO: proxy obsolete, when networking issue is resolved
- with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp:
+ filename = ftpretr(
+ url, proxy_hostport="159.69.240.245:15201"
+ ) # TODO: proxy obsolete, when networking issue is resolved
+ with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as decomp:
try:
gzf = gzip.open(filename)
shutil.copyfileobj(gzf, decomp)
except zlib.error as exc:
- print('[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)'.format(
- url, exc), file=sys.stderr)
+ print(
+ "[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)".format(
+ url, exc
+ ),
+ file=sys.stderr,
+ )
continue
# Here, blob is the unparsed XML; we peek into it to use PMID as
@@ -131,15 +145,17 @@ class PubmedFTPWorker:
# WARNING: Parsing foreign XML exposes us at some
# https://docs.python.org/3/library/xml.html#xml-vulnerabilities
# here.
- for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'):
- soup = BeautifulSoup(blob, 'xml')
- pmid = soup.find('PMID')
+ for blob in xmlstream(decomp.name, "PubmedArticle", encoding="utf-8"):
+ soup = BeautifulSoup(blob, "xml")
+ pmid = soup.find("PMID")
if pmid is None:
raise ValueError("no PMID found, please adjust identifier extraction")
count += 1
if count % 50 == 0:
print("... up to {}".format(count), file=sys.stderr)
- self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast)
+ self.producer.produce(
+ self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast
+ )
self.producer.flush()
os.remove(filename)
@@ -151,13 +167,17 @@ class PubmedFTPWorker:
while True:
self.date_file_map = generate_date_file_map(host=self.host)
if len(self.date_file_map) == 0:
- raise ValueError("map from dates to files should not be empty, maybe the HTML changed?")
+ raise ValueError(
+ "map from dates to files should not be empty, maybe the HTML changed?"
+ )
current = self.state.next_span(continuous)
if current:
print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config)
+ self.state.complete(
+ current, kafka_topic=self.state_topic, kafka_config=self.kafka_config
+ )
continue
if continuous:
@@ -168,7 +188,7 @@ class PubmedFTPWorker:
print("{} FTP ingest caught up".format(self.name))
-def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
+def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):
"""
Generate a DefaultDict[string, set] mapping dates to absolute filepaths on
the server (mostly we have one file, but sometimes more).
@@ -176,14 +196,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...}
"""
mapping = collections.defaultdict(set)
- pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)')
+ pattern = re.compile(r"Filename: ([^ ]*.xml) -- Created: ([^<]*)")
ftp = ftplib.FTP(host)
ftp.login()
- filenames = ftp.nlst('/pubmed/updatefiles')
+ filenames = ftp.nlst("/pubmed/updatefiles")
retries, retry_delay = 10, 60
for name in filenames:
- if not name.endswith('.html'):
+ if not name.endswith(".html"):
continue
sio = io.StringIO()
for i in range(retries):
@@ -201,10 +221,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
ftp = ftplib.FTP(host)
ftp.login()
sio.truncate(0)
- ftp.retrlines('RETR {}'.format(name), sio.write)
+ ftp.retrlines("RETR {}".format(name), sio.write)
except (EOFError, ftplib.error_temp, socket.gaierror, BrokenPipeError) as exc:
- print("ftp retr on {} failed with {} ({}) ({} retries left)".format(
- name, exc, type(exc), retries - (i + 1)), file=sys.stderr)
+ print(
+ "ftp retr on {} failed with {} ({}) ({} retries left)".format(
+ name, exc, type(exc), retries - (i + 1)
+ ),
+ file=sys.stderr,
+ )
if i + 1 == retries:
raise
else:
@@ -214,16 +238,24 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
contents = sio.getvalue()
match = pattern.search(contents)
if match is None:
- print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr)
+ print(
+ "pattern miss in {} on: {}, may need to adjust pattern: {}".format(
+ name, contents, pattern
+ ),
+ file=sys.stderr,
+ )
continue
- filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
+ (
+ filename,
+ filedate,
+ ) = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
date = dateparser.parse(filedate)
- fullpath = '/pubmed/updatefiles/{}.gz'.format(filename)
- date_str = date.strftime('%Y-%m-%d')
+ fullpath = "/pubmed/updatefiles/{}.gz".format(filename)
+ date_str = date.strftime("%Y-%m-%d")
mapping[date_str].add(fullpath)
- print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr)
+ print("added entry for {}: {}".format(date_str, fullpath), file=sys.stderr)
- print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr)
+ print("generated date-file mapping for {} dates".format(len(mapping)), file=sys.stderr)
return mapping
@@ -241,20 +273,29 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
when we encountered EOFError while talking to the FTP server. Retry delay in seconds.
"""
if proxy_hostport is not None:
- return ftpretr_via_http_proxy(url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay)
+ return ftpretr_via_http_proxy(
+ url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay
+ )
parsed = urlparse(url)
server, path = parsed.netloc, parsed.path
for i in range(max_retries):
try:
ftp = ftplib.FTP(server)
ftp.login()
- with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f:
- print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr)
- ftp.retrbinary('RETR %s' % path, f.write)
+ with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f:
+ print(
+ "retrieving {} from {} to {} ...".format(path, server, f.name),
+ file=sys.stderr,
+ )
+ ftp.retrbinary("RETR %s" % path, f.write)
ftp.close()
except EOFError as exc:
- print("ftp retrbinary on {} failed with {} ({}) ({} retries left)".format(
- path, exc, type(exc), max_retries - (i + 1)), file=sys.stderr)
+ print(
+ "ftp retrbinary on {} failed with {} ({}) ({} retries left)".format(
+ path, exc, type(exc), max_retries - (i + 1)
+ ),
+ file=sys.stderr,
+ )
if i + 1 == max_retries:
raise
else:
@@ -263,7 +304,9 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
return f.name
-def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1):
+def ftpretr_via_http_proxy(
+ url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1
+):
"""
Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would
be retrievable via proxy.com/a/b/c; (in 09/2021 we used
@@ -276,19 +319,23 @@ def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retri
try:
url = "http://{}{}".format(proxy_hostport, path)
print("retrieving file via proxy (ftpup) from {}".format(url), file=sys.stderr)
- with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f:
+ with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f:
cmd = ["wget", "-c", url, "-O", f.name]
result = subprocess.run(cmd)
return f.name
except (subprocess.CalledProcessError, OSError, ValueError) as exc:
- print("ftp fetch {} failed with {} ({}) ({} retries left)".format(
- url, exc, type(exc), max_retries - (i + 1)), file=sys.stderr)
+ print(
+ "ftp fetch {} failed with {} ({}) ({} retries left)".format(
+ url, exc, type(exc), max_retries - (i + 1)
+ ),
+ file=sys.stderr,
+ )
if i + 1 == max_retries:
raise
time.sleep(retry_delay)
-def xmlstream(filename, tag, encoding='utf-8'):
+def xmlstream(filename, tag, encoding="utf-8"):
"""
Note: This might move into a generic place in the future.
@@ -300,23 +347,29 @@ def xmlstream(filename, tag, encoding='utf-8'):
Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities
"""
+
def strip_ns(tag):
- if '}' not in tag:
+ if "}" not in tag:
return tag
- return tag.split('}')[1]
+ return tag.split("}")[1]
# https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm
- context = iter(ET.iterparse(filename, events=(
- 'start',
- 'end',
- )))
+ context = iter(
+ ET.iterparse(
+ filename,
+ events=(
+ "start",
+ "end",
+ ),
+ )
+ )
try:
_, root = next(context)
except StopIteration:
return
for event, elem in context:
- if not strip_ns(elem.tag) == tag or event == 'start':
+ if not strip_ns(elem.tag) == tag or event == "start":
continue
yield ET.tostring(elem, encoding=encoding)