diff options
author | Martin Czygan <martin@archive.org> | 2020-03-10 15:33:17 +0000 |
---|---|---|
committer | Martin Czygan <martin@archive.org> | 2020-03-10 15:33:17 +0000 |
commit | 336630e1d445fb9d233447f9af4bac94473a12bf (patch) | |
tree | b2d4baa4ea6d3afac29b9b2760101c10d18ea30a /python/fatcat_tools | |
parent | f4cce5a765a9f80f9c5e9c907689c06dc9ebf102 (diff) | |
parent | d18942d1ab4d394bdb275bcf9eb82d1cba814775 (diff) | |
download | fatcat-336630e1d445fb9d233447f9af4bac94473a12bf.tar.gz fatcat-336630e1d445fb9d233447f9af4bac94473a12bf.zip |
Merge branch 'martin-kafka-bs4-import' into 'master'
pubmed and arxiv harvest preparations
See merge request webgroup/fatcat!28
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 5 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 19 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 249 | ||||
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 65 |
5 files changed, 318 insertions, 22 deletions
diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 7d814696..b3757a7d 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,5 +1,6 @@ from .harvest_common import HarvestState from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker -from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\ - HarvestDoajArticleWorker, HarvestDoajJournalWorker +from .oaipmh import HarvestArxivWorker, HarvestDoajArticleWorker, \ + HarvestDoajJournalWorker +from .pubmed import PubmedFTPWorker diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 11b5fa0a..c95f3445 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -132,25 +132,6 @@ class HarvestArxivWorker(HarvestOaiPmhWorker): self.name = "arxiv" -class HarvestPubmedWorker(HarvestOaiPmhWorker): - """ - Will likely be doing MEDLINE daily batch imports for primary metadata, but - might also want to run a PMC importer to update fulltext and assign OA - licenses (when appropriate). - - Pubmed refs: - - https://www.ncbi.nlm.nih.gov/pmc/tools/oai/ - - https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm - - https://github.com/titipata/pubmed_parser - """ - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.endpoint_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi" - self.metadata_prefix = "pmc_fm" - self.name = "pubmed" - - class HarvestDoajJournalWorker(HarvestOaiPmhWorker): """ WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py new file mode 100644 index 00000000..3f31696e --- /dev/null +++ b/python/fatcat_tools/harvest/pubmed.py @@ -0,0 +1,249 @@ +""" +Pubmed harvest via FTP. + +Assumptions: + +* fixed hostname and directory structure +* XML files are gzip compressed +* accompanying HTML files contain correct dates +""" + +import collections +import gzip +import io +import os +import re +import shutil +import sys +import tempfile +import time +import xml.etree.ElementTree as ET +from ftplib import FTP +from urllib.parse import urljoin, urlparse + +import dateparser +from bs4 import BeautifulSoup +from confluent_kafka import KafkaException, Producer + +from .harvest_common import HarvestState + + +class PubmedFTPWorker: + """ + Access Pubmed FTP for daily updates. + + * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles + * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt + + Daily Update Files (02/2020) + ---------------------------- + Each day, NLM produces update files that include new, revised and deleted + citations. The first Update file to be loaded after loading the complete + set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. + + Usually, three files per update, e.g.: + + * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016_stats.html + * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz + * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 + + Currently (02/2020) the HTML contains the date. + + <html> + <head><title></title></head> + <body> + <h4>Filename: pubmed20n1019.xml -- Created: Wed Dec 18 14:31:09 EST 2019</h4> + <table cellspacing="0" cellpadding="0" border="0" width="300"> + <tr> + + """ + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): + self.name = 'Pubmed' + self.host = 'ftp.ncbi.nlm.nih.gov' + self.produce_topic = produce_topic + self.state_topic = state_topic + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + } + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks + self.state = HarvestState(start_date, end_date) + self.state.initialize_from_kafka(self.state_topic, self.kafka_config) + self.producer = self._kafka_producer() + self.date_file_map = None + + def _kafka_producer(self): + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + self._kafka_fail_fast = fail_fast + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + return Producer(producer_conf) + + def fetch_date(self, date): + """ + Fetch file for a given date and feed Kafka one article per message. If + the fetched XML does not contain a PMID, this method will fail. + + If no date file mapping is found, this will fail. + """ + if self.date_file_map is None: + raise ValueError("cannot fetch date without date file mapping") + + date_str = date.strftime('%Y-%m-%d') + paths = self.date_file_map.get(date_str) + if paths is None: + print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) + return False + + count = 0 + for path in paths: + # Fetch and decompress file. + url = "ftp://{}{}".format(self.host, path) + filename = ftpretr(url) + with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: + gzf = gzip.open(filename) + shutil.copyfileobj(gzf, decomp) + + # Here, blob is the unparsed XML; we peek into it to use PMID as + # message key. We need streaming, since some updates would consume + # GBs otherwise. + # WARNING: Parsing foreign XML exposes us at some + # https://docs.python.org/3/library/xml.html#xml-vulnerabilities + # here. + for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): + soup = BeautifulSoup(blob, 'xml') + pmid = soup.find('PMID') + if pmid is None: + raise ValueError("no PMID found, please adjust identifier extraction") + count += 1 + if count % 50 == 0: + print("... up to {}".format(count), file=sys.stderr) + self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + + self.producer.flush() + os.remove(filename) + os.remove(decomp.name) + + return True + + def run(self, continuous=False): + while True: + self.date_file_map = generate_date_file_map(host=self.host) + if len(self.date_file_map) == 0: + raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + + current = self.state.next(continuous) + if current: + print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr) + self.fetch_date(current) + self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) + continue + + if continuous: + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep) + else: + break + print("{} FTP ingest caught up".format(self.name)) + + +def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): + """ + Generate a DefaultDict[string, set] mapping dates to absolute filepaths on + the server (mostly we have one file, but sometimes more). + + Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...} + """ + mapping = collections.defaultdict(set) + pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') + ftp = FTP(host) + ftp.login() + filenames = ftp.nlst('/pubmed/updatefiles') + + for name in filenames: + if not name.endswith('.html'): + continue + sio = io.StringIO() + ftp.retrlines('RETR {}'.format(name), sio.write) + contents = sio.getvalue() + match = pattern.search(contents) + if match is None: + print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) + continue + filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') + date = dateparser.parse(filedate) + fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) + date_str = date.strftime('%Y-%m-%d') + mapping[date_str].add(fullpath) + print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr) + + print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + return mapping + + +def ftpretr(url): + """ + Note: This might move into a generic place in the future. + + Fetch (RETR) a remote file given by its URL (e.g. + "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a + local temporary file. Returns the name of the local, closed temporary file. + + It is the reponsibility of the caller to cleanup the temporary file. + """ + parsed = urlparse(url) + server, path = parsed.netloc, parsed.path + ftp = FTP(server) + ftp.login() + with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: + print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr) + ftp.retrbinary('RETR %s' % path, f.write) + ftp.close() + return f.name + + +def xmlstream(filename, tag, encoding='utf-8'): + """ + Note: This might move into a generic place in the future. + + Given a path to an XML file and a tag name (without namespace), stream + through the XML and yield elements denoted by tag as string. + + for snippet in xmlstream("sample.xml", "sometag"): + print(len(snippet)) + + Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities + """ + def strip_ns(tag): + if not '}' in tag: + return tag + return tag.split('}')[1] + + # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm + context = iter(ET.iterparse(filename, events=( + 'start', + 'end', + ))) + try: + _, root = next(context) + except StopIteration: + return + + for event, elem in context: + if not strip_ns(elem.tag) == tag or event == 'start': + continue + + yield ET.tostring(elem, encoding=encoding) + root.clear() diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 10557ef8..c26446fd 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of: """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug from .datacite import DataciteImporter from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index a84ce90f..c000ad62 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -730,6 +730,71 @@ class Bs4XmlFileListPusher(RecordPusher): print(counts) return counts +class KafkaBs4XmlPusher(RecordPusher): + """ + Fetch XML for an article from Kafka, parse via Bs4. + """ + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + self.importer = importer + self.consumer = make_kafka_consumer( + kafka_hosts, + kafka_env, + topic_suffix, + group, + kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 25) + + def run(self): + count = 0 + last_push = datetime.datetime.now() + while True: + # Note: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). + batch = self.consumer.consume( + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) + if not batch: + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) + continue + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + for msg in batch: + soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") + self.importer.push_record(soup) + soup.decompose() + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + counts = self.importer.finish() + print(counts) + self.consumer.close() + return counts class KafkaJsonPusher(RecordPusher): |