From 519b90d7f539b667e919c220a53626e7a4ac48bf Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Fri, 14 Feb 2020 14:32:57 +0100 Subject: pubmed ftp harvest and KafkaBs4XmlPusher * add PubmedFTPWorker * utils are currently stored alongside pubmed (e.g. ftpretr, xmlstream) but may live elsewhere, as they are more generic * add KafkaBs4XmlPusher --- python/fatcat_harvest.py | 15 ++- python/fatcat_import.py | 32 ++--- python/fatcat_tools/harvest/oaipmh.py | 15 +++ python/fatcat_tools/harvest/pubmed.py | 199 ++++++++++++++++++++++++++++++ python/fatcat_tools/importers/__init__.py | 2 +- python/fatcat_tools/importers/common.py | 65 ++++++++++ 6 files changed, 307 insertions(+), 21 deletions(-) create mode 100644 python/fatcat_tools/harvest/pubmed.py diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py index 58bef9ca..4c4f34a1 100755 --- a/python/fatcat_harvest.py +++ b/python/fatcat_harvest.py @@ -6,7 +6,7 @@ import datetime import raven from fatcat_tools.harvest import HarvestCrossrefWorker, HarvestDataciteWorker,\ HarvestArxivWorker, HarvestPubmedWorker, HarvestDoajArticleWorker,\ - HarvestDoajJournalWorker + HarvestDoajJournalWorker, PubmedFTPWorker # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable sentry_client = raven.Client() @@ -42,10 +42,17 @@ def run_arxiv(args): worker.run(continuous=args.continuous) def run_pubmed(args): - worker = HarvestPubmedWorker( + # worker = HarvestPubmedWorker( + # kafka_hosts=args.kafka_hosts, + # produce_topic="fatcat-{}.oaipmh-pubmed".format(args.env), + # state_topic="fatcat-{}.oaipmh-pubmed-state".format(args.env), + # start_date=args.start_date, + # end_date=args.end_date) + # worker.run(continuous=args.continuous) + worker = PubmedFTPWorker( kafka_hosts=args.kafka_hosts, - produce_topic="fatcat-{}.oaipmh-pubmed".format(args.env), - state_topic="fatcat-{}.oaipmh-pubmed-state".format(args.env), + produce_topic="fatcat-{}.ftp-pubmed".format(args.env), + state_topic="fatcat-{}.ftp-pubmed-state".format(args.env), start_date=args.start_date, end_date=args.end_date) worker.run(continuous=args.continuous) diff --git a/python/fatcat_import.py b/python/fatcat_import.py index ad4de0e2..eaab9cfe 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -39,14 +39,13 @@ def run_arxiv(args): ari = ArxivRawImporter(args.api, edit_batch_size=args.batch_size) if args.kafka_mode: - raise NotImplementedError - #KafkaBs4XmlPusher( - # ari, - # args.kafka_hosts, - # args.kafka_env, - # "api-arxiv", - # "fatcat-{}-import-arxiv".format(args.kafka_env), - #).run() + KafkaBs4XmlPusher( + ari, + args.kafka_hosts, + args.kafka_env, + "oaipmh-arxiv", + "fatcat-{}-import-arxiv".format(args.kafka_env), + ).run() else: Bs4XmlFilePusher(ari, args.xml_file, "record").run() @@ -57,14 +56,13 @@ def run_pubmed(args): do_updates=args.do_updates, lookup_refs=(not args.no_lookup_refs)) if args.kafka_mode: - raise NotImplementedError - #KafkaBs4XmlPusher( - # pi, - # args.kafka_hosts, - # args.kafka_env, - # "api-pubmed", - # "fatcat-{}import-arxiv".format(args.kafka_env), - #).run() + KafkaBs4XmlPusher( + pi, + args.kafka_hosts, + args.kafka_env, + "oaipmh-pubmed", + "fatcat-{}-import-pubmed".format(args.kafka_env), + ).run() else: Bs4XmlLargeFilePusher( pi, @@ -297,6 +295,7 @@ def main(): auth_var="FATCAT_AUTH_WORKER_ARXIV", ) sub_arxiv.add_argument('xml_file', + nargs='?', help="arXivRaw XML file to import from", default=sys.stdin, type=argparse.FileType('r')) sub_arxiv.add_argument('--kafka-mode', @@ -310,6 +309,7 @@ def main(): auth_var="FATCAT_AUTH_WORKER_PUBMED", ) sub_pubmed.add_argument('xml_file', + nargs='?', help="Pubmed XML file to import from", default=sys.stdin, type=argparse.FileType('r')) sub_pubmed.add_argument('issn_map_file', diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 11b5fa0a..8e9efea8 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -142,6 +142,21 @@ class HarvestPubmedWorker(HarvestOaiPmhWorker): - https://www.ncbi.nlm.nih.gov/pmc/tools/oai/ - https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm - https://github.com/titipata/pubmed_parser + + TODO(martin): OAI does not seem to support the format we already have an + importer for. Maybe we can use "Daily Update Files" -- + + Daily Update Files + ------------------ + Each day, NLM produces update files that include new, revised and deleted + citations. The first Update file to be loaded after loading the complete + set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. + ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles + + NOTES: + + * OAI: https://dtd.nlm.nih.gov/archiving/2.3/xsd/archivearticle.xsd + * FTP: https://dtd.nlm.nih.gov/ncbi/pubmed/out/pubmed_190101.dtd """ def __init__(self, **kwargs): diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py new file mode 100644 index 00000000..da872a10 --- /dev/null +++ b/python/fatcat_tools/harvest/pubmed.py @@ -0,0 +1,199 @@ +""" +Pubmed harvest via FTP. +""" + +import collections +import io +import re +import sys +import tempfile +import xml.etree.ElementTree as ET +from ftplib import FTP +from urllib.parse import urljoin, urlparse + +import dateparser +from bs4 import BeautifulSoup +from confluent_kafka import KafkaException, Producer + +from .harvest_common import HarvestState + + +class PubmedFTPWorker: + """ + Access Pubmed FTP host for daily updates. + + * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles + * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt + + Daily Update Files (02/2020) + ---------------------------- + Each day, NLM produces update files that include new, revised and deleted + citations. The first Update file to be loaded after loading the complete + set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. + + Usually, three files per update, e.g.: + + * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016_stats.html + * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz + * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 + + The HTML contains the date. + + + + +

Filename: pubmed20n1019.xml -- Created: Wed Dec 18 14:31:09 EST 2019

+ + + + When this workers starts, it will figure out a mapping from date to XML + files by looking at all the HTML files. + """ + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_data=None): + self.host = 'ftp.ncbi.nlm.nih.gov' + self.produce_topic = produce_topic + self.state_topic = state_topic + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + } + self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.state = HarvestState(start_date, end_date) + self.state.initialize_from_kafka(self.state_topic, self.kafka_config) + self.date_file_map = self.generate_date_file_map() + if len(self.date_file_map) == 0: + raise ValueError('mapping from dates to files should not be empty') + + def generate_date_file_map(self): + """ + Generate a dictionary mapping date (strings) to filepaths. The date is + parsed from pubmed20n1016_stats.html, mapping to absolute path on FTP, + e.g. "2020-01-02": "/pubmed/updatefiles/pubmed20n1016.xml.gz". + """ + mapping = collections.defaultdict(set) + pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') + ftp = FTP(self.host) + ftp.login() + filenames = ftp.nlst('/pubmed/updatefiles') + for name in filenames: + if not name.endswith('.html'): + continue + sio = io.StringIO() + ftp.retrlines('RETR {}'.format(name), sio.write) + contents = sio.getvalue() + match = pattern.search(contents) + if match is None: + print('pattern miss on: {}, may need to adjust pattern: {}'.format(contents, pattern), + file=sys.stderr) + continue + filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') + date = dateparser.parse(filedate) + fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) + mapping[date.format('%Y-%m-%d')].add(fullpath) + + self.date_file_map = mapping + print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + + + def fetch_date(self, date): + """ + Fetch file for a given date and feed Kafka one article per message. + """ + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) + raise KafkaException(err) + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) + + date_str = date.format('%Y-%m-%d') + paths = self.date_file_map.get(date_str) + if paths is None: + print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) + return + + count = 0 + for path in paths: + filename = ftpretr(urljoin(self.host, path)) + for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'): + soup = BeautifulSoup(blob) + pmid = soup.find('PMID') + if pmid is None: + raise ValueError('no PMID found, adjust identifier extraction') + count += 1 + if count % 50 == 0: + print("... up to {} from {}".format(count, filename)) + producer.produce( + self.produce_topic, + blob, + key=pmid.text, + on_delivery=fail_fast) + producer.flush() + + def run(self, continuous=False): + while True: + current = self.state.next(continuous) + if current: + print("Fetching DOIs updated on {} (UTC)".format(current)) + self.fetch_date(current) + self.state.complete(current, + kafka_topic=self.state_topic, + kafka_config=self.kafka_config) + continue + + if continuous: + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep) + else: + break + print("{} DOI ingest caught up".format(self.name)) + + +class ftpretr(uri): + """ + Fetch (RETR) a remote file to a local temporary file. + """ + parsed = urlparse(uri) + server, path = parsed.netloc, parsed.path + ftp = FTP(self.server) + ftp.login() + with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: + ftp.retrbinary('RETR %s' % path, f.write) + ftp.close() + return f.name + + +def xmlstream(filename, tag, encoding='utf-8'): + """ + Given a path to an XML file and a tag name (without namespace), stream + through the XML, and emit the element denoted by tag for processing as string. + + for snippet in xmlstream("sample.xml", "sometag"): + print(len(snippet)) + """ + def strip_ns(tag): + if not '}' in tag: + return tag + return tag.split('}')[1] + + # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm + context = iter(ET.iterparse(filename, events=( + 'start', + 'end', + ))) + _, root = next(context) + + for event, elem in context: + if not strip_ns(elem.tag) == tag or event == 'start': + continue + + yield ET.tostring(elem, encoding=encoding) + root.clear() diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index d936605f..03c7cbcc 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of: """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug from .datacite import DataciteImporter from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 1ffbd6e7..1cb5529e 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -721,6 +721,71 @@ class Bs4XmlFileListPusher(RecordPusher): print(counts) return counts +class KafkaBs4XmlPusher(RecordPusher): + """ + Fetch XML for an article from Kafka, parse via Bs4. + """ + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + self.importer = importer + self.consumer = make_kafka_consumer( + kafka_hosts, + kafka_env, + topic_suffix, + group, + kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 100) + + def run(self): + count = 0 + last_push = datetime.datetime.now() + while True: + # Note: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). + batch = self.consumer.consume( + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) + if not batch: + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) + continue + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + for msg in batch: + soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") + self.importer.push_record(soup) + soup.decompose() + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + counts = self.importer.finish() + print(counts) + self.consumer.close() + return counts class KafkaJsonPusher(RecordPusher): -- cgit v1.2.3 From 456f318b5ef904786aabf2411d2d244cd38f25b1 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 19 Feb 2020 01:12:57 +0100 Subject: pubmed ftp: fix url --- python/fatcat_tools/harvest/pubmed.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index da872a10..7afb2dab 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -122,7 +122,7 @@ class PubmedFTPWorker: count = 0 for path in paths: - filename = ftpretr(urljoin(self.host, path)) + filename = ftpretr("ftp://{}".format(urljoin(self.host, path))) for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'): soup = BeautifulSoup(blob) pmid = soup.find('PMID') @@ -157,11 +157,13 @@ class PubmedFTPWorker: print("{} DOI ingest caught up".format(self.name)) -class ftpretr(uri): +class ftpretr(url): """ - Fetch (RETR) a remote file to a local temporary file. + Fetch (RETR) a remote file given by its URL (e.g. + "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a + local temporary file. """ - parsed = urlparse(uri) + parsed = urlparse(url) server, path = parsed.netloc, parsed.path ftp = FTP(self.server) ftp.login() -- cgit v1.2.3 From 376053a479a8d683fc5e099d0b0b3cb76c026d16 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 19 Feb 2020 02:28:11 +0100 Subject: more pubmed adjustments * regenerate map in continuous mode * add tests --- python/fatcat_import.py | 2 +- python/fatcat_tools/harvest/__init__.py | 1 + python/fatcat_tools/harvest/pubmed.py | 187 +++++++++++++-------- python/tests/files/pubmedsample_2019.xml.gz | Bin 0 -> 218528 bytes .../tests/files/pubmedsample_no_pmid_2019.xml.gz | Bin 0 -> 1128 bytes python/tests/harvest_pubmed.py | 78 +++++++++ 6 files changed, 197 insertions(+), 71 deletions(-) create mode 100644 python/tests/files/pubmedsample_2019.xml.gz create mode 100644 python/tests/files/pubmedsample_no_pmid_2019.xml.gz create mode 100644 python/tests/harvest_pubmed.py diff --git a/python/fatcat_import.py b/python/fatcat_import.py index eaab9cfe..b0fde01b 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -60,7 +60,7 @@ def run_pubmed(args): pi, args.kafka_hosts, args.kafka_env, - "oaipmh-pubmed", + "ftp-pubmed", "fatcat-{}-import-pubmed".format(args.kafka_env), ).run() else: diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 7d814696..5f7a1001 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -3,3 +3,4 @@ from .harvest_common import HarvestState from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\ HarvestDoajArticleWorker, HarvestDoajJournalWorker +from .pubmed import PubmedFTPWorker diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 7afb2dab..fb421037 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -1,12 +1,22 @@ """ Pubmed harvest via FTP. + +Assumptions: + +* fixed hostname and directory structure +* XML files are gzip compressed +* accompanying HTML files contain correct dates """ import collections +import gzip import io +import os import re +import shutil import sys import tempfile +import time import xml.etree.ElementTree as ET from ftplib import FTP from urllib.parse import urljoin, urlparse @@ -20,7 +30,7 @@ from .harvest_common import HarvestState class PubmedFTPWorker: """ - Access Pubmed FTP host for daily updates. + Access Pubmed FTP for daily updates. * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt @@ -37,7 +47,7 @@ class PubmedFTPWorker: * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 - The HTML contains the date. + Currently (02/2020) the HTML contains the date. @@ -46,128 +56,158 @@ class PubmedFTPWorker:
- When this workers starts, it will figure out a mapping from date to XML - files by looking at all the HTML files. """ - def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_data=None): + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): + self.name = 'Pubmed' self.host = 'ftp.ncbi.nlm.nih.gov' self.produce_topic = produce_topic self.state_topic = state_topic self.kafka_config = { 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes } - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) - self.date_file_map = self.generate_date_file_map() - if len(self.date_file_map) == 0: - raise ValueError('mapping from dates to files should not be empty') - - def generate_date_file_map(self): - """ - Generate a dictionary mapping date (strings) to filepaths. The date is - parsed from pubmed20n1016_stats.html, mapping to absolute path on FTP, - e.g. "2020-01-02": "/pubmed/updatefiles/pubmed20n1016.xml.gz". - """ - mapping = collections.defaultdict(set) - pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') - ftp = FTP(self.host) - ftp.login() - filenames = ftp.nlst('/pubmed/updatefiles') - for name in filenames: - if not name.endswith('.html'): - continue - sio = io.StringIO() - ftp.retrlines('RETR {}'.format(name), sio.write) - contents = sio.getvalue() - match = pattern.search(contents) - if match is None: - print('pattern miss on: {}, may need to adjust pattern: {}'.format(contents, pattern), - file=sys.stderr) - continue - filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') - date = dateparser.parse(filedate) - fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) - mapping[date.format('%Y-%m-%d')].add(fullpath) - - self.date_file_map = mapping - print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) - + self.producer = self._kafka_producer() + self.date_file_map = None - def fetch_date(self, date): - """ - Fetch file for a given date and feed Kafka one article per message. - """ + def _kafka_producer(self): def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) print("Bailing out...", file=sys.stderr) + # TODO: should it be sys.exit(-1)? raise KafkaException(err) + self._kafka_fail_fast = fail_fast + producer_conf = self.kafka_config.copy() producer_conf.update({ 'delivery.report.only.error': True, 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm }, }) - producer = Producer(producer_conf) + return Producer(producer_conf) - date_str = date.format('%Y-%m-%d') + def fetch_date(self, date): + """ + Fetch file for a given date and feed Kafka one article per message. If + the fetched XML does not contain a PMID, this method will fail. We + build up the mapping from dates to filenames on first run. + """ + if self.date_file_map is None: + self.date_file_map = generate_date_file_map(host=self.host) + if len(self.date_file_map) == 0: + raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + + date_str = date.strftime('%Y-%m-%d') paths = self.date_file_map.get(date_str) if paths is None: print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) - return + return False count = 0 for path in paths: - filename = ftpretr("ftp://{}".format(urljoin(self.host, path))) - for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'): - soup = BeautifulSoup(blob) + # Fetch and decompress file. + url = "ftp://{}{}".format(self.host, path) + filename = ftpretr(url) + with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: + gzf = gzip.open(filename) + shutil.copyfileobj(gzf, decomp) + + # Here, blob is the unparsed XML; we peek into it to use PMID as + # message key. We need streaming, since some updates would consume + # GBs otherwise. + # WARNING: Parsing foreign XML exposes us at some + # https://docs.python.org/3/library/xml.html#xml-vulnerabilities + # here. + for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): + soup = BeautifulSoup(blob, 'xml') pmid = soup.find('PMID') if pmid is None: - raise ValueError('no PMID found, adjust identifier extraction') + raise ValueError("no PMID found, please adjust identifier extraction") count += 1 if count % 50 == 0: - print("... up to {} from {}".format(count, filename)) - producer.produce( - self.produce_topic, - blob, - key=pmid.text, - on_delivery=fail_fast) - producer.flush() + print("... up to {}".format(count), file=sys.stderr) + self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + + self.producer.flush() + os.remove(filename) + os.remove(decomp.name) + + return True def run(self, continuous=False): while True: current = self.state.next(continuous) if current: - print("Fetching DOIs updated on {} (UTC)".format(current)) + print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, - kafka_topic=self.state_topic, - kafka_config=self.kafka_config) + self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) continue if continuous: print("Sleeping {} seconds...".format(self.loop_sleep)) time.sleep(self.loop_sleep) + # Need to keep the mapping fresh. + self.date_file_map = generate_date_file_map(host=self.host) else: break print("{} DOI ingest caught up".format(self.name)) -class ftpretr(url): +def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): + """ + Generate a DefaultDict[string, set] mapping dates to absolute filepaths on + the server (mostly we have one file, but sometimes more). + + Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...} """ + mapping = collections.defaultdict(set) + pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') + ftp = FTP(host) + ftp.login() + filenames = ftp.nlst('/pubmed/updatefiles') + + for name in filenames: + if not name.endswith('.html'): + continue + sio = io.StringIO() + ftp.retrlines('RETR {}'.format(name), sio.write) + contents = sio.getvalue() + match = pattern.search(contents) + if match is None: + print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) + continue + filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') + date = dateparser.parse(filedate) + fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) + date_str = date.strftime('%Y-%m-%d') + mapping[date_str].add(fullpath) + print('added entry for {}: {}'.format(date_str, fullpath)) + + print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + return mapping + + +def ftpretr(url): + """ + Note: This might move into a generic place in the future. + Fetch (RETR) a remote file given by its URL (e.g. "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a - local temporary file. + local temporary file. Returns the name of the local, closed temporary file. + + It is the reponsibility of the caller to cleanup the temporary file. """ parsed = urlparse(url) server, path = parsed.netloc, parsed.path - ftp = FTP(self.server) + ftp = FTP(server) ftp.login() with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: + print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr) ftp.retrbinary('RETR %s' % path, f.write) ftp.close() return f.name @@ -175,11 +215,15 @@ class ftpretr(url): def xmlstream(filename, tag, encoding='utf-8'): """ + Note: This might move into a generic place in the future. + Given a path to an XML file and a tag name (without namespace), stream - through the XML, and emit the element denoted by tag for processing as string. + through the XML and yield elements denoted by tag as string. for snippet in xmlstream("sample.xml", "sometag"): - print(len(snippet)) + print(len(snippet)) + + Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities """ def strip_ns(tag): if not '}' in tag: @@ -191,7 +235,10 @@ def xmlstream(filename, tag, encoding='utf-8'): 'start', 'end', ))) - _, root = next(context) + try: + _, root = next(context) + except StopIteration: + return for event, elem in context: if not strip_ns(elem.tag) == tag or event == 'start': diff --git a/python/tests/files/pubmedsample_2019.xml.gz b/python/tests/files/pubmedsample_2019.xml.gz new file mode 100644 index 00000000..bafad833 Binary files /dev/null and b/python/tests/files/pubmedsample_2019.xml.gz differ diff --git a/python/tests/files/pubmedsample_no_pmid_2019.xml.gz b/python/tests/files/pubmedsample_no_pmid_2019.xml.gz new file mode 100644 index 00000000..8785a06d Binary files /dev/null and b/python/tests/files/pubmedsample_no_pmid_2019.xml.gz differ diff --git a/python/tests/harvest_pubmed.py b/python/tests/harvest_pubmed.py new file mode 100644 index 00000000..71832722 --- /dev/null +++ b/python/tests/harvest_pubmed.py @@ -0,0 +1,78 @@ +""" +Test pubmed FTP harvest. +""" + +import datetime +import json +import os + +import pytest + +from fatcat_tools.harvest import * + + +def test_pubmed_harvest_date(mocker): + + # mock out the harvest state object so it doesn't try to actually connect + # to Kafka + mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka') + + # Mocking a file fetched from FTP, should contain some 'PubmedArticle' elements. + # $ zcat tests/files/pubmedsample_2019.xml.gz | grep -c '' + # 176 + file_to_retrieve = os.path.join(os.path.dirname(__file__), 'files/pubmedsample_2019.xml.gz') + ftpretr = mocker.patch('fatcat_tools.harvest.pubmed.ftpretr') + ftpretr.return_value = file_to_retrieve + + test_date = '2020-02-20' + + # We'll need one entry in the date_file_map. + generate_date_file_map = mocker.patch('fatcat_tools.harvest.pubmed.generate_date_file_map') + generate_date_file_map.return_value = {test_date: set(['dummy'])} + + # For cleanup. + os.remove = mocker.Mock() + + harvester = PubmedFTPWorker( + kafka_hosts="dummy", + produce_topic="dummy-produce-topic", + state_topic="dummy-state-topic", + ) + + harvester.producer = mocker.Mock() + # Since we mock out the FTP fetch, the concrete date does not matter here. + harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d')) + + # check that we published the expected number of DOI objects were published + # to the (mock) kafka topic + assert harvester.producer.produce.call_count == 176 + assert harvester.producer.flush.call_count == 1 + assert os.remove.call_count == 2 + +def test_pubmed_harvest_date_no_pmid(mocker): + # mock out the harvest state object so it doesn't try to actually connect + # to Kafka + mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka') + + file_to_retrieve = os.path.join(os.path.dirname(__file__), 'files/pubmedsample_no_pmid_2019.xml.gz') + ftpretr = mocker.patch('fatcat_tools.harvest.pubmed.ftpretr') + ftpretr.return_value = file_to_retrieve + + test_date = '2020-02-20' + + # We'll need one entry in the date_file_map. + generate_date_file_map = mocker.patch('fatcat_tools.harvest.pubmed.generate_date_file_map') + generate_date_file_map.return_value = {test_date: set(['dummy'])} + + harvester = PubmedFTPWorker( + kafka_hosts="dummy", + produce_topic="dummy-produce-topic", + state_topic="dummy-state-topic", + ) + + harvester.producer = mocker.Mock() + + # The file has not PMID, not importable. + with pytest.raises(ValueError): + harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d')) + -- cgit v1.2.3 From 0ec71d1b47dffe0d976554a76efa636cbe025e08 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 9 Mar 2020 19:06:15 +0100 Subject: fatcat_import: address potential hanging, if stdin is empty --- python/fatcat_import.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/fatcat_import.py b/python/fatcat_import.py index b0fde01b..bb715fb6 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -47,6 +47,8 @@ def run_arxiv(args): "fatcat-{}-import-arxiv".format(args.kafka_env), ).run() else: + if args.xml_file == sys.stdin: + print('note: reading from stdin', file=sys.stderr) Bs4XmlFilePusher(ari, args.xml_file, "record").run() def run_pubmed(args): -- cgit v1.2.3 From c6b29c17eeea4c067dcc391fe6d9bdaec3f657b3 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 9 Mar 2020 20:07:30 +0100 Subject: oaipmh: HarvestPubmedWorker obsoleted by PubmedFTPWorker --- python/fatcat_tools/harvest/oaipmh.py | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 8e9efea8..c95f3445 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -132,40 +132,6 @@ class HarvestArxivWorker(HarvestOaiPmhWorker): self.name = "arxiv" -class HarvestPubmedWorker(HarvestOaiPmhWorker): - """ - Will likely be doing MEDLINE daily batch imports for primary metadata, but - might also want to run a PMC importer to update fulltext and assign OA - licenses (when appropriate). - - Pubmed refs: - - https://www.ncbi.nlm.nih.gov/pmc/tools/oai/ - - https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm - - https://github.com/titipata/pubmed_parser - - TODO(martin): OAI does not seem to support the format we already have an - importer for. Maybe we can use "Daily Update Files" -- - - Daily Update Files - ------------------ - Each day, NLM produces update files that include new, revised and deleted - citations. The first Update file to be loaded after loading the complete - set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. - ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles - - NOTES: - - * OAI: https://dtd.nlm.nih.gov/archiving/2.3/xsd/archivearticle.xsd - * FTP: https://dtd.nlm.nih.gov/ncbi/pubmed/out/pubmed_190101.dtd - """ - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.endpoint_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi" - self.metadata_prefix = "pmc_fm" - self.name = "pubmed" - - class HarvestDoajJournalWorker(HarvestOaiPmhWorker): """ WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params -- cgit v1.2.3 From 4a1aa8b28dde887524b294d33566583baf24db74 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 9 Mar 2020 20:09:05 +0100 Subject: pubmed: we sync from FTP --- python/fatcat_tools/harvest/pubmed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index fb421037..527e6dd3 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -155,7 +155,7 @@ class PubmedFTPWorker: self.date_file_map = generate_date_file_map(host=self.host) else: break - print("{} DOI ingest caught up".format(self.name)) + print("{} FTP ingest caught up".format(self.name)) def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): -- cgit v1.2.3 From 6f5ead85169c71a7a010a9e2d54d4c41489c607b Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 9 Mar 2020 20:12:37 +0100 Subject: pubmed: citations is a bit more precise > Each day, NLM produces update files that include new, revised and deleted citations. -- ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt --- python/fatcat_tools/harvest/pubmed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 527e6dd3..43a671cd 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -143,7 +143,7 @@ class PubmedFTPWorker: while True: current = self.state.next(continuous) if current: - print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) + print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) continue -- cgit v1.2.3 From 34a18cd1821d09ac0beee8959407ec51cf397337 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 10 Mar 2020 12:50:21 +0100 Subject: harvest: fix imports from HarvestPubmedWorker cleanup --- python/fatcat_harvest.py | 4 ++-- python/fatcat_tools/harvest/__init__.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py index 4c4f34a1..7ac0f16c 100755 --- a/python/fatcat_harvest.py +++ b/python/fatcat_harvest.py @@ -5,8 +5,8 @@ import argparse import datetime import raven from fatcat_tools.harvest import HarvestCrossrefWorker, HarvestDataciteWorker,\ - HarvestArxivWorker, HarvestPubmedWorker, HarvestDoajArticleWorker,\ - HarvestDoajJournalWorker, PubmedFTPWorker + HarvestArxivWorker, HarvestDoajArticleWorker, HarvestDoajJournalWorker,\ + PubmedFTPWorker # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable sentry_client = raven.Client() diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 5f7a1001..b3757a7d 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,6 +1,6 @@ from .harvest_common import HarvestState from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker -from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\ - HarvestDoajArticleWorker, HarvestDoajJournalWorker +from .oaipmh import HarvestArxivWorker, HarvestDoajArticleWorker, \ + HarvestDoajJournalWorker from .pubmed import PubmedFTPWorker -- cgit v1.2.3 From db8892f2d960379525a4182b884c1d51c0c70186 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 10 Mar 2020 12:51:11 +0100 Subject: pubmed: move mapping generation out of fetch_date * fetch_date will fail on missing mapping * adjust tests (test will require access to pubmed ftp) --- python/fatcat_tools/harvest/pubmed.py | 15 ++++++++------- python/tests/harvest_pubmed.py | 2 ++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 43a671cd..34522eb3 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -94,13 +94,12 @@ class PubmedFTPWorker: def fetch_date(self, date): """ Fetch file for a given date and feed Kafka one article per message. If - the fetched XML does not contain a PMID, this method will fail. We - build up the mapping from dates to filenames on first run. + the fetched XML does not contain a PMID, this method will fail. + + If no date file mapping is found, this will fail. """ if self.date_file_map is None: - self.date_file_map = generate_date_file_map(host=self.host) - if len(self.date_file_map) == 0: - raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + raise ValueError("cannot fetch date without date file mapping") date_str = date.strftime('%Y-%m-%d') paths = self.date_file_map.get(date_str) @@ -141,6 +140,10 @@ class PubmedFTPWorker: def run(self, continuous=False): while True: + self.date_file_map = generate_date_file_map(host=self.host) + if len(self.date_file_map) == 0: + raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + current = self.state.next(continuous) if current: print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr) @@ -151,8 +154,6 @@ class PubmedFTPWorker: if continuous: print("Sleeping {} seconds...".format(self.loop_sleep)) time.sleep(self.loop_sleep) - # Need to keep the mapping fresh. - self.date_file_map = generate_date_file_map(host=self.host) else: break print("{} FTP ingest caught up".format(self.name)) diff --git a/python/tests/harvest_pubmed.py b/python/tests/harvest_pubmed.py index 71832722..f8db46b6 100644 --- a/python/tests/harvest_pubmed.py +++ b/python/tests/harvest_pubmed.py @@ -9,6 +9,7 @@ import os import pytest from fatcat_tools.harvest import * +from fatcat_tools.harvest.pubmed import generate_date_file_map def test_pubmed_harvest_date(mocker): @@ -40,6 +41,7 @@ def test_pubmed_harvest_date(mocker): ) harvester.producer = mocker.Mock() + harvester.date_file_map = generate_date_file_map() # Since we mock out the FTP fetch, the concrete date does not matter here. harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d')) -- cgit v1.2.3 From 4fcdc6d60260ca79693fd7d1ce2ae03065d5ef0c Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 10 Mar 2020 12:52:42 +0100 Subject: pubmed: log to stderr --- python/fatcat_tools/harvest/pubmed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 34522eb3..3f31696e 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -187,7 +187,7 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) date_str = date.strftime('%Y-%m-%d') mapping[date_str].add(fullpath) - print('added entry for {}: {}'.format(date_str, fullpath)) + print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr) print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) return mapping -- cgit v1.2.3 From d18942d1ab4d394bdb275bcf9eb82d1cba814775 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 10 Mar 2020 12:55:47 +0100 Subject: common: use smaller batch size since XML parsing may be slow Address kafka tradeoff between long and short time-outs. Shorter time-outs would facilitate > consumer group re-balances and other consumer group state changes [...] in a reasonable human time-frame. --- python/fatcat_tools/importers/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 1cb5529e..5f5c46b8 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -735,7 +735,7 @@ class KafkaBs4XmlPusher(RecordPusher): kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') ) self.poll_interval = kwargs.get('poll_interval', 5.0) - self.consume_batch_size = kwargs.get('consume_batch_size', 100) + self.consume_batch_size = kwargs.get('consume_batch_size', 25) def run(self): count = 0 -- cgit v1.2.3