diff options
Diffstat (limited to 'python')
| -rwxr-xr-x | python/fatcat_import.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 1 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 187 | ||||
| -rw-r--r-- | python/tests/files/pubmedsample_2019.xml.gz | bin | 0 -> 218528 bytes | |||
| -rw-r--r-- | python/tests/files/pubmedsample_no_pmid_2019.xml.gz | bin | 0 -> 1128 bytes | |||
| -rw-r--r-- | python/tests/harvest_pubmed.py | 78 | 
6 files changed, 197 insertions, 71 deletions
| diff --git a/python/fatcat_import.py b/python/fatcat_import.py index eaab9cfe..b0fde01b 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -60,7 +60,7 @@ def run_pubmed(args):              pi,              args.kafka_hosts,              args.kafka_env, -            "oaipmh-pubmed", +            "ftp-pubmed",              "fatcat-{}-import-pubmed".format(args.kafka_env),          ).run()      else: diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 7d814696..5f7a1001 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -3,3 +3,4 @@ from .harvest_common import HarvestState  from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker  from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\      HarvestDoajArticleWorker, HarvestDoajJournalWorker +from .pubmed import PubmedFTPWorker diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index 7afb2dab..fb421037 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -1,12 +1,22 @@  """  Pubmed harvest via FTP. + +Assumptions: + +* fixed hostname and directory structure +* XML files are gzip compressed +* accompanying HTML files contain correct dates  """  import collections +import gzip  import io +import os  import re +import shutil  import sys  import tempfile +import time  import xml.etree.ElementTree as ET  from ftplib import FTP  from urllib.parse import urljoin, urlparse @@ -20,7 +30,7 @@ from .harvest_common import HarvestState  class PubmedFTPWorker:      """ -    Access Pubmed FTP host for daily updates. +    Access Pubmed FTP for daily updates.      * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles      * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt @@ -37,7 +47,7 @@ class PubmedFTPWorker:      * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz      * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 -    The HTML contains the date. +    Currently (02/2020) the HTML contains the date.          <html>          <head><title></title></head> @@ -46,128 +56,158 @@ class PubmedFTPWorker:          <table cellspacing="0" cellpadding="0" border="0" width="300">          <tr> -    When this workers starts, it will figure out a mapping from date to XML -    files by looking at all the HTML files.      """ -    def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_data=None): +    def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): +        self.name = 'Pubmed'          self.host = 'ftp.ncbi.nlm.nih.gov'          self.produce_topic = produce_topic          self.state_topic = state_topic          self.kafka_config = {              'bootstrap.servers': kafka_hosts, -            'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes +            'message.max.bytes': 20000000,  # ~20 MBytes; broker is ~50 MBytes          } -        self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks +        self.loop_sleep = 60 * 60  # how long to wait, in seconds, between date checks          self.state = HarvestState(start_date, end_date)          self.state.initialize_from_kafka(self.state_topic, self.kafka_config) -        self.date_file_map = self.generate_date_file_map() -        if len(self.date_file_map) == 0: -            raise ValueError('mapping from dates to files should not be empty') - -    def generate_date_file_map(self): -        """ -        Generate a dictionary mapping date (strings) to filepaths. The date is -        parsed from pubmed20n1016_stats.html, mapping to absolute path on FTP, -        e.g. "2020-01-02": "/pubmed/updatefiles/pubmed20n1016.xml.gz". -        """ -	mapping = collections.defaultdict(set) -        pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') -        ftp = FTP(self.host) -        ftp.login() -        filenames = ftp.nlst('/pubmed/updatefiles') -	for name in filenames: -	    if not name.endswith('.html'): -		continue -	    sio = io.StringIO() -	    ftp.retrlines('RETR {}'.format(name), sio.write) -	    contents = sio.getvalue() -	    match = pattern.search(contents) -	    if match is None: -                print('pattern miss on: {}, may need to adjust pattern: {}'.format(contents, pattern), -                      file=sys.stderr) -		continue -	    filename, filedate = match.groups()  # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') -	    date = dateparser.parse(filedate) -	    fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) -	    mapping[date.format('%Y-%m-%d')].add(fullpath) - -        self.date_file_map = mapping -        print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) - +        self.producer = self._kafka_producer() +        self.date_file_map = None -    def fetch_date(self, date): -        """ -        Fetch file for a given date and feed Kafka one article per message. -        """ +    def _kafka_producer(self):          def fail_fast(err, msg):              if err is not None:                  print("Kafka producer delivery error: {}".format(err), file=sys.stderr)                  print("Bailing out...", file=sys.stderr) +                # TODO: should it be sys.exit(-1)?                  raise KafkaException(err) +        self._kafka_fail_fast = fail_fast +          producer_conf = self.kafka_config.copy()          producer_conf.update({              'delivery.report.only.error': True,              'default.topic.config': { -                'request.required.acks': -1, # all brokers must confirm +                'request.required.acks': -1,  # all brokers must confirm              },          }) -        producer = Producer(producer_conf) +        return Producer(producer_conf) -        date_str = date.format('%Y-%m-%d') +    def fetch_date(self, date): +        """ +        Fetch file for a given date and feed Kafka one article per message. If +        the fetched XML does not contain a PMID, this method will fail. We +        build up the mapping from dates to filenames on first run. +        """ +        if self.date_file_map is None: +            self.date_file_map = generate_date_file_map(host=self.host) +            if len(self.date_file_map) == 0: +                raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + +        date_str = date.strftime('%Y-%m-%d')          paths = self.date_file_map.get(date_str)          if paths is None:              print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) -            return +            return False          count = 0          for path in paths: -            filename = ftpretr("ftp://{}".format(urljoin(self.host, path))) -            for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'): -                soup = BeautifulSoup(blob) +            # Fetch and decompress file. +            url = "ftp://{}{}".format(self.host, path) +            filename = ftpretr(url) +            with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: +                gzf = gzip.open(filename) +                shutil.copyfileobj(gzf, decomp) + +            # Here, blob is the unparsed XML; we peek into it to use PMID as +            # message key. We need streaming, since some updates would consume +            # GBs otherwise. +            # WARNING: Parsing foreign XML exposes us at some +            # https://docs.python.org/3/library/xml.html#xml-vulnerabilities +            # here. +            for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): +                soup = BeautifulSoup(blob, 'xml')                  pmid = soup.find('PMID')                  if pmid is None: -                    raise ValueError('no PMID found, adjust identifier extraction') +                    raise ValueError("no PMID found, please adjust identifier extraction")                  count += 1                  if count % 50 == 0: -                    print("... up to {} from {}".format(count, filename)) -                producer.produce( -                    self.produce_topic, -                    blob, -                    key=pmid.text, -                    on_delivery=fail_fast) -            producer.flush() +                    print("... up to {}".format(count), file=sys.stderr) +                self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + +            self.producer.flush() +            os.remove(filename) +            os.remove(decomp.name) + +        return True      def run(self, continuous=False):          while True:              current = self.state.next(continuous)              if current: -                print("Fetching DOIs updated on {} (UTC)".format(current)) +                print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)                  self.fetch_date(current) -                self.state.complete(current, -                    kafka_topic=self.state_topic, -                    kafka_config=self.kafka_config) +                self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config)                  continue              if continuous:                  print("Sleeping {} seconds...".format(self.loop_sleep))                  time.sleep(self.loop_sleep) +                # Need to keep the mapping fresh. +                self.date_file_map = generate_date_file_map(host=self.host)              else:                  break          print("{} DOI ingest caught up".format(self.name)) -class ftpretr(url): +def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): +    """ +    Generate a DefaultDict[string, set] mapping dates to absolute filepaths on +    the server (mostly we have one file, but sometimes more). + +    Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...}      """ +    mapping = collections.defaultdict(set) +    pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') +    ftp = FTP(host) +    ftp.login() +    filenames = ftp.nlst('/pubmed/updatefiles') + +    for name in filenames: +        if not name.endswith('.html'): +            continue +        sio = io.StringIO() +        ftp.retrlines('RETR {}'.format(name), sio.write) +        contents = sio.getvalue() +        match = pattern.search(contents) +        if match is None: +            print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) +            continue +        filename, filedate = match.groups()  # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') +        date = dateparser.parse(filedate) +        fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) +        date_str = date.strftime('%Y-%m-%d') +        mapping[date_str].add(fullpath) +        print('added entry for {}: {}'.format(date_str, fullpath)) + +    print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) +    return mapping + + +def ftpretr(url): +    """ +    Note: This might move into a generic place in the future. +      Fetch (RETR) a remote file given by its URL (e.g.      "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a -    local temporary file. +    local temporary file. Returns the name of the local, closed temporary file. + +    It is the reponsibility of the caller to cleanup the temporary file.      """      parsed = urlparse(url)      server, path = parsed.netloc, parsed.path -    ftp = FTP(self.server) +    ftp = FTP(server)      ftp.login()      with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: +        print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr)          ftp.retrbinary('RETR %s' % path, f.write)      ftp.close()      return f.name @@ -175,11 +215,15 @@ class ftpretr(url):  def xmlstream(filename, tag, encoding='utf-8'):      """ +    Note: This might move into a generic place in the future. +      Given a path to an XML file and a tag name (without namespace), stream -    through the XML, and emit the element denoted by tag for processing as string. +    through the XML and yield elements denoted by tag as string.      for snippet in xmlstream("sample.xml", "sometag"): -	print(len(snippet)) +        print(len(snippet)) + +    Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities      """      def strip_ns(tag):          if not '}' in tag: @@ -191,7 +235,10 @@ def xmlstream(filename, tag, encoding='utf-8'):          'start',          'end',      ))) -    _, root = next(context) +    try: +        _, root = next(context) +    except StopIteration: +        return      for event, elem in context:          if not strip_ns(elem.tag) == tag or event == 'start': diff --git a/python/tests/files/pubmedsample_2019.xml.gz b/python/tests/files/pubmedsample_2019.xml.gzBinary files differ new file mode 100644 index 00000000..bafad833 --- /dev/null +++ b/python/tests/files/pubmedsample_2019.xml.gz diff --git a/python/tests/files/pubmedsample_no_pmid_2019.xml.gz b/python/tests/files/pubmedsample_no_pmid_2019.xml.gzBinary files differ new file mode 100644 index 00000000..8785a06d --- /dev/null +++ b/python/tests/files/pubmedsample_no_pmid_2019.xml.gz diff --git a/python/tests/harvest_pubmed.py b/python/tests/harvest_pubmed.py new file mode 100644 index 00000000..71832722 --- /dev/null +++ b/python/tests/harvest_pubmed.py @@ -0,0 +1,78 @@ +""" +Test pubmed FTP harvest. +""" + +import datetime +import json +import os + +import pytest + +from fatcat_tools.harvest import * + + +def test_pubmed_harvest_date(mocker): + +    # mock out the harvest state object so it doesn't try to actually connect +    # to Kafka +    mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka') + +    # Mocking a file fetched from FTP, should contain some 'PubmedArticle' elements. +    # $ zcat tests/files/pubmedsample_2019.xml.gz | grep -c '<PubmedArticle>' +    # 176 +    file_to_retrieve = os.path.join(os.path.dirname(__file__), 'files/pubmedsample_2019.xml.gz') +    ftpretr = mocker.patch('fatcat_tools.harvest.pubmed.ftpretr') +    ftpretr.return_value = file_to_retrieve + +    test_date = '2020-02-20' + +    # We'll need one entry in the date_file_map. +    generate_date_file_map = mocker.patch('fatcat_tools.harvest.pubmed.generate_date_file_map') +    generate_date_file_map.return_value = {test_date: set(['dummy'])} + +    # For cleanup. +    os.remove = mocker.Mock() + +    harvester = PubmedFTPWorker( +        kafka_hosts="dummy", +        produce_topic="dummy-produce-topic", +        state_topic="dummy-state-topic", +    ) + +    harvester.producer = mocker.Mock() +    # Since we mock out the FTP fetch, the concrete date does not matter here. +    harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d')) + +    # check that we published the expected number of DOI objects were published +    # to the (mock) kafka topic +    assert harvester.producer.produce.call_count == 176 +    assert harvester.producer.flush.call_count == 1 +    assert os.remove.call_count == 2 + +def test_pubmed_harvest_date_no_pmid(mocker): +    # mock out the harvest state object so it doesn't try to actually connect +    # to Kafka +    mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka') + +    file_to_retrieve = os.path.join(os.path.dirname(__file__), 'files/pubmedsample_no_pmid_2019.xml.gz') +    ftpretr = mocker.patch('fatcat_tools.harvest.pubmed.ftpretr') +    ftpretr.return_value = file_to_retrieve + +    test_date = '2020-02-20' + +    # We'll need one entry in the date_file_map. +    generate_date_file_map = mocker.patch('fatcat_tools.harvest.pubmed.generate_date_file_map') +    generate_date_file_map.return_value = {test_date: set(['dummy'])} + +    harvester = PubmedFTPWorker( +        kafka_hosts="dummy", +        produce_topic="dummy-produce-topic", +        state_topic="dummy-state-topic", +    ) + +    harvester.producer = mocker.Mock() + +    # The file has not PMID, not importable. +    with pytest.raises(ValueError): +        harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d')) + | 
