aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2020-02-19 02:28:11 +0100
committerMartin Czygan <martin.czygan@gmail.com>2020-02-22 17:44:38 +0100
commit376053a479a8d683fc5e099d0b0b3cb76c026d16 (patch)
tree0ef3343b258ad9d485aa8a558e1f505832c5d415 /python/fatcat_tools/harvest
parent456f318b5ef904786aabf2411d2d244cd38f25b1 (diff)
downloadfatcat-376053a479a8d683fc5e099d0b0b3cb76c026d16.tar.gz
fatcat-376053a479a8d683fc5e099d0b0b3cb76c026d16.zip
more pubmed adjustments
* regenerate map in continuous mode * add tests
Diffstat (limited to 'python/fatcat_tools/harvest')
-rw-r--r--python/fatcat_tools/harvest/__init__.py1
-rw-r--r--python/fatcat_tools/harvest/pubmed.py187
2 files changed, 118 insertions, 70 deletions
diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py
index 7d814696..5f7a1001 100644
--- a/python/fatcat_tools/harvest/__init__.py
+++ b/python/fatcat_tools/harvest/__init__.py
@@ -3,3 +3,4 @@ from .harvest_common import HarvestState
from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker
from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\
HarvestDoajArticleWorker, HarvestDoajJournalWorker
+from .pubmed import PubmedFTPWorker
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py
index 7afb2dab..fb421037 100644
--- a/python/fatcat_tools/harvest/pubmed.py
+++ b/python/fatcat_tools/harvest/pubmed.py
@@ -1,12 +1,22 @@
"""
Pubmed harvest via FTP.
+
+Assumptions:
+
+* fixed hostname and directory structure
+* XML files are gzip compressed
+* accompanying HTML files contain correct dates
"""
import collections
+import gzip
import io
+import os
import re
+import shutil
import sys
import tempfile
+import time
import xml.etree.ElementTree as ET
from ftplib import FTP
from urllib.parse import urljoin, urlparse
@@ -20,7 +30,7 @@ from .harvest_common import HarvestState
class PubmedFTPWorker:
"""
- Access Pubmed FTP host for daily updates.
+ Access Pubmed FTP for daily updates.
* Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles
* Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt
@@ -37,7 +47,7 @@ class PubmedFTPWorker:
* ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz
* ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5
- The HTML contains the date.
+ Currently (02/2020) the HTML contains the date.
<html>
<head><title></title></head>
@@ -46,128 +56,158 @@ class PubmedFTPWorker:
<table cellspacing="0" cellpadding="0" border="0" width="300">
<tr>
- When this workers starts, it will figure out a mapping from date to XML
- files by looking at all the HTML files.
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_data=None):
+ def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
+ self.name = 'Pubmed'
self.host = 'ftp.ncbi.nlm.nih.gov'
self.produce_topic = produce_topic
self.state_topic = state_topic
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
}
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
+ self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
- self.date_file_map = self.generate_date_file_map()
- if len(self.date_file_map) == 0:
- raise ValueError('mapping from dates to files should not be empty')
-
- def generate_date_file_map(self):
- """
- Generate a dictionary mapping date (strings) to filepaths. The date is
- parsed from pubmed20n1016_stats.html, mapping to absolute path on FTP,
- e.g. "2020-01-02": "/pubmed/updatefiles/pubmed20n1016.xml.gz".
- """
- mapping = collections.defaultdict(set)
- pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)')
- ftp = FTP(self.host)
- ftp.login()
- filenames = ftp.nlst('/pubmed/updatefiles')
- for name in filenames:
- if not name.endswith('.html'):
- continue
- sio = io.StringIO()
- ftp.retrlines('RETR {}'.format(name), sio.write)
- contents = sio.getvalue()
- match = pattern.search(contents)
- if match is None:
- print('pattern miss on: {}, may need to adjust pattern: {}'.format(contents, pattern),
- file=sys.stderr)
- continue
- filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
- date = dateparser.parse(filedate)
- fullpath = '/pubmed/updatefiles/{}.gz'.format(filename)
- mapping[date.format('%Y-%m-%d')].add(fullpath)
-
- self.date_file_map = mapping
- print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr)
-
+ self.producer = self._kafka_producer()
+ self.date_file_map = None
- def fetch_date(self, date):
- """
- Fetch file for a given date and feed Kafka one article per message.
- """
+ def _kafka_producer(self):
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
raise KafkaException(err)
+ self._kafka_fail_fast = fail_fast
+
producer_conf = self.kafka_config.copy()
producer_conf.update({
'delivery.report.only.error': True,
'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
},
})
- producer = Producer(producer_conf)
+ return Producer(producer_conf)
- date_str = date.format('%Y-%m-%d')
+ def fetch_date(self, date):
+ """
+ Fetch file for a given date and feed Kafka one article per message. If
+ the fetched XML does not contain a PMID, this method will fail. We
+ build up the mapping from dates to filenames on first run.
+ """
+ if self.date_file_map is None:
+ self.date_file_map = generate_date_file_map(host=self.host)
+ if len(self.date_file_map) == 0:
+ raise ValueError("map from dates to files should not be empty, maybe the HTML changed?")
+
+ date_str = date.strftime('%Y-%m-%d')
paths = self.date_file_map.get(date_str)
if paths is None:
print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr)
- return
+ return False
count = 0
for path in paths:
- filename = ftpretr("ftp://{}".format(urljoin(self.host, path)))
- for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'):
- soup = BeautifulSoup(blob)
+ # Fetch and decompress file.
+ url = "ftp://{}{}".format(self.host, path)
+ filename = ftpretr(url)
+ with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp:
+ gzf = gzip.open(filename)
+ shutil.copyfileobj(gzf, decomp)
+
+ # Here, blob is the unparsed XML; we peek into it to use PMID as
+ # message key. We need streaming, since some updates would consume
+ # GBs otherwise.
+ # WARNING: Parsing foreign XML exposes us at some
+ # https://docs.python.org/3/library/xml.html#xml-vulnerabilities
+ # here.
+ for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'):
+ soup = BeautifulSoup(blob, 'xml')
pmid = soup.find('PMID')
if pmid is None:
- raise ValueError('no PMID found, adjust identifier extraction')
+ raise ValueError("no PMID found, please adjust identifier extraction")
count += 1
if count % 50 == 0:
- print("... up to {} from {}".format(count, filename))
- producer.produce(
- self.produce_topic,
- blob,
- key=pmid.text,
- on_delivery=fail_fast)
- producer.flush()
+ print("... up to {}".format(count), file=sys.stderr)
+ self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast)
+
+ self.producer.flush()
+ os.remove(filename)
+ os.remove(decomp.name)
+
+ return True
def run(self, continuous=False):
while True:
current = self.state.next(continuous)
if current:
- print("Fetching DOIs updated on {} (UTC)".format(current))
+ print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current,
- kafka_topic=self.state_topic,
- kafka_config=self.kafka_config)
+ self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config)
continue
if continuous:
print("Sleeping {} seconds...".format(self.loop_sleep))
time.sleep(self.loop_sleep)
+ # Need to keep the mapping fresh.
+ self.date_file_map = generate_date_file_map(host=self.host)
else:
break
print("{} DOI ingest caught up".format(self.name))
-class ftpretr(url):
+def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
+ """
+ Generate a DefaultDict[string, set] mapping dates to absolute filepaths on
+ the server (mostly we have one file, but sometimes more).
+
+ Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...}
"""
+ mapping = collections.defaultdict(set)
+ pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)')
+ ftp = FTP(host)
+ ftp.login()
+ filenames = ftp.nlst('/pubmed/updatefiles')
+
+ for name in filenames:
+ if not name.endswith('.html'):
+ continue
+ sio = io.StringIO()
+ ftp.retrlines('RETR {}'.format(name), sio.write)
+ contents = sio.getvalue()
+ match = pattern.search(contents)
+ if match is None:
+ print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr)
+ continue
+ filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
+ date = dateparser.parse(filedate)
+ fullpath = '/pubmed/updatefiles/{}.gz'.format(filename)
+ date_str = date.strftime('%Y-%m-%d')
+ mapping[date_str].add(fullpath)
+ print('added entry for {}: {}'.format(date_str, fullpath))
+
+ print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr)
+ return mapping
+
+
+def ftpretr(url):
+ """
+ Note: This might move into a generic place in the future.
+
Fetch (RETR) a remote file given by its URL (e.g.
"ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a
- local temporary file.
+ local temporary file. Returns the name of the local, closed temporary file.
+
+ It is the reponsibility of the caller to cleanup the temporary file.
"""
parsed = urlparse(url)
server, path = parsed.netloc, parsed.path
- ftp = FTP(self.server)
+ ftp = FTP(server)
ftp.login()
with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f:
+ print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr)
ftp.retrbinary('RETR %s' % path, f.write)
ftp.close()
return f.name
@@ -175,11 +215,15 @@ class ftpretr(url):
def xmlstream(filename, tag, encoding='utf-8'):
"""
+ Note: This might move into a generic place in the future.
+
Given a path to an XML file and a tag name (without namespace), stream
- through the XML, and emit the element denoted by tag for processing as string.
+ through the XML and yield elements denoted by tag as string.
for snippet in xmlstream("sample.xml", "sometag"):
- print(len(snippet))
+ print(len(snippet))
+
+ Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities
"""
def strip_ns(tag):
if not '}' in tag:
@@ -191,7 +235,10 @@ def xmlstream(filename, tag, encoding='utf-8'):
'start',
'end',
)))
- _, root = next(context)
+ try:
+ _, root = next(context)
+ except StopIteration:
+ return
for event, elem in context:
if not strip_ns(elem.tag) == tag or event == 'start':