diff options
Diffstat (limited to 'python/fatcat_tools/harvest/pubmed.py')
-rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 187 |
1 files changed, 117 insertions, 70 deletions
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 7afb2dab..fb421037 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -1,12 +1,22 @@ """ Pubmed harvest via FTP. + +Assumptions: + +* fixed hostname and directory structure +* XML files are gzip compressed +* accompanying HTML files contain correct dates """ import collections +import gzip import io +import os import re +import shutil import sys import tempfile +import time import xml.etree.ElementTree as ET from ftplib import FTP from urllib.parse import urljoin, urlparse @@ -20,7 +30,7 @@ from .harvest_common import HarvestState class PubmedFTPWorker: """ - Access Pubmed FTP host for daily updates. + Access Pubmed FTP for daily updates. * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt @@ -37,7 +47,7 @@ class PubmedFTPWorker: * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 - The HTML contains the date. + Currently (02/2020) the HTML contains the date. <html> <head><title></title></head> @@ -46,128 +56,158 @@ class PubmedFTPWorker: <table cellspacing="0" cellpadding="0" border="0" width="300"> <tr> - When this workers starts, it will figure out a mapping from date to XML - files by looking at all the HTML files. """ - def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_data=None): + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): + self.name = 'Pubmed' self.host = 'ftp.ncbi.nlm.nih.gov' self.produce_topic = produce_topic self.state_topic = state_topic self.kafka_config = { 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes } - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) - self.date_file_map = self.generate_date_file_map() - if len(self.date_file_map) == 0: - raise ValueError('mapping from dates to files should not be empty') - - def generate_date_file_map(self): - """ - Generate a dictionary mapping date (strings) to filepaths. The date is - parsed from pubmed20n1016_stats.html, mapping to absolute path on FTP, - e.g. "2020-01-02": "/pubmed/updatefiles/pubmed20n1016.xml.gz". - """ - mapping = collections.defaultdict(set) - pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') - ftp = FTP(self.host) - ftp.login() - filenames = ftp.nlst('/pubmed/updatefiles') - for name in filenames: - if not name.endswith('.html'): - continue - sio = io.StringIO() - ftp.retrlines('RETR {}'.format(name), sio.write) - contents = sio.getvalue() - match = pattern.search(contents) - if match is None: - print('pattern miss on: {}, may need to adjust pattern: {}'.format(contents, pattern), - file=sys.stderr) - continue - filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') - date = dateparser.parse(filedate) - fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) - mapping[date.format('%Y-%m-%d')].add(fullpath) - - self.date_file_map = mapping - print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) - + self.producer = self._kafka_producer() + self.date_file_map = None - def fetch_date(self, date): - """ - Fetch file for a given date and feed Kafka one article per message. - """ + def _kafka_producer(self): def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) + # TODO: should it be sys.exit(-1)? raise KafkaException(err) + self._kafka_fail_fast = fail_fast + producer_conf = self.kafka_config.copy() producer_conf.update({ 'delivery.report.only.error': True, 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm }, }) - producer = Producer(producer_conf) + return Producer(producer_conf) - date_str = date.format('%Y-%m-%d') + def fetch_date(self, date): + """ + Fetch file for a given date and feed Kafka one article per message. If + the fetched XML does not contain a PMID, this method will fail. We + build up the mapping from dates to filenames on first run. + """ + if self.date_file_map is None: + self.date_file_map = generate_date_file_map(host=self.host) + if len(self.date_file_map) == 0: + raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + + date_str = date.strftime('%Y-%m-%d') paths = self.date_file_map.get(date_str) if paths is None: print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) - return + return False count = 0 for path in paths: - filename = ftpretr("ftp://{}".format(urljoin(self.host, path))) - for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'): - soup = BeautifulSoup(blob) + # Fetch and decompress file. + url = "ftp://{}{}".format(self.host, path) + filename = ftpretr(url) + with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: + gzf = gzip.open(filename) + shutil.copyfileobj(gzf, decomp) + + # Here, blob is the unparsed XML; we peek into it to use PMID as + # message key. We need streaming, since some updates would consume + # GBs otherwise. + # WARNING: Parsing foreign XML exposes us at some + # https://docs.python.org/3/library/xml.html#xml-vulnerabilities + # here. + for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): + soup = BeautifulSoup(blob, 'xml') pmid = soup.find('PMID') if pmid is None: - raise ValueError('no PMID found, adjust identifier extraction') + raise ValueError("no PMID found, please adjust identifier extraction") count += 1 if count % 50 == 0: - print("... up to {} from {}".format(count, filename)) - producer.produce( - self.produce_topic, - blob, - key=pmid.text, - on_delivery=fail_fast) - producer.flush() + print("... up to {}".format(count), file=sys.stderr) + self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + + self.producer.flush() + os.remove(filename) + os.remove(decomp.name) + + return True def run(self, continuous=False): while True: current = self.state.next(continuous) if current: - print("Fetching DOIs updated on {} (UTC)".format(current)) + print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, - kafka_topic=self.state_topic, - kafka_config=self.kafka_config) + self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) continue if continuous: print("Sleeping {} seconds...".format(self.loop_sleep)) time.sleep(self.loop_sleep) + # Need to keep the mapping fresh. + self.date_file_map = generate_date_file_map(host=self.host) else: break print("{} DOI ingest caught up".format(self.name)) -class ftpretr(url): +def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): + """ + Generate a DefaultDict[string, set] mapping dates to absolute filepaths on + the server (mostly we have one file, but sometimes more). + + Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...} """ + mapping = collections.defaultdict(set) + pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') + ftp = FTP(host) + ftp.login() + filenames = ftp.nlst('/pubmed/updatefiles') + + for name in filenames: + if not name.endswith('.html'): + continue + sio = io.StringIO() + ftp.retrlines('RETR {}'.format(name), sio.write) + contents = sio.getvalue() + match = pattern.search(contents) + if match is None: + print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) + continue + filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') + date = dateparser.parse(filedate) + fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) + date_str = date.strftime('%Y-%m-%d') + mapping[date_str].add(fullpath) + print('added entry for {}: {}'.format(date_str, fullpath)) + + print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + return mapping + + +def ftpretr(url): + """ + Note: This might move into a generic place in the future. + Fetch (RETR) a remote file given by its URL (e.g. "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a - local temporary file. + local temporary file. Returns the name of the local, closed temporary file. + + It is the reponsibility of the caller to cleanup the temporary file. """ parsed = urlparse(url) server, path = parsed.netloc, parsed.path - ftp = FTP(self.server) + ftp = FTP(server) ftp.login() with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: + print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr) ftp.retrbinary('RETR %s' % path, f.write) ftp.close() return f.name @@ -175,11 +215,15 @@ class ftpretr(url): def xmlstream(filename, tag, encoding='utf-8'): """ + Note: This might move into a generic place in the future. + Given a path to an XML file and a tag name (without namespace), stream - through the XML, and emit the element denoted by tag for processing as string. + through the XML and yield elements denoted by tag as string. for snippet in xmlstream("sample.xml", "sometag"): - print(len(snippet)) + print(len(snippet)) + + Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities """ def strip_ns(tag): if not '}' in tag: @@ -191,7 +235,10 @@ def xmlstream(filename, tag, encoding='utf-8'): 'start', 'end', ))) - _, root = next(context) + try: + _, root = next(context) + except StopIteration: + return for event, elem in context: if not strip_ns(elem.tag) == tag or event == 'start': |