diff options
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 5 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 19 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 249 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 65 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/crossref.py | 8 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/datacite.py | 10 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/jalc.py | 12 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/pubmed.py | 27 | ||||
| -rw-r--r-- | python/fatcat_tools/transforms/csl.py | 18 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 3 | 
11 files changed, 384 insertions, 34 deletions
| diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 7d814696..b3757a7d 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,5 +1,6 @@  from .harvest_common import HarvestState  from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker -from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\ -    HarvestDoajArticleWorker, HarvestDoajJournalWorker +from .oaipmh import HarvestArxivWorker, HarvestDoajArticleWorker, \ +    HarvestDoajJournalWorker +from .pubmed import PubmedFTPWorker diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 11b5fa0a..c95f3445 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -132,25 +132,6 @@ class HarvestArxivWorker(HarvestOaiPmhWorker):          self.name = "arxiv" -class HarvestPubmedWorker(HarvestOaiPmhWorker): -    """ -    Will likely be doing MEDLINE daily batch imports for primary metadata, but -    might also want to run a PMC importer to update fulltext and assign OA -    licenses (when appropriate). - -    Pubmed refs: -    - https://www.ncbi.nlm.nih.gov/pmc/tools/oai/ -    - https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm -    - https://github.com/titipata/pubmed_parser -    """ - -    def __init__(self, **kwargs): -        super().__init__(**kwargs) -        self.endpoint_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi" -        self.metadata_prefix = "pmc_fm" -        self.name = "pubmed" - -  class HarvestDoajJournalWorker(HarvestOaiPmhWorker):      """      WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py new file mode 100644 index 00000000..3f31696e --- /dev/null +++ b/python/fatcat_tools/harvest/pubmed.py @@ -0,0 +1,249 @@ +""" +Pubmed harvest via FTP. + +Assumptions: + +* fixed hostname and directory structure +* XML files are gzip compressed +* accompanying HTML files contain correct dates +""" + +import collections +import gzip +import io +import os +import re +import shutil +import sys +import tempfile +import time +import xml.etree.ElementTree as ET +from ftplib import FTP +from urllib.parse import urljoin, urlparse + +import dateparser +from bs4 import BeautifulSoup +from confluent_kafka import KafkaException, Producer + +from .harvest_common import HarvestState + + +class PubmedFTPWorker: +    """ +    Access Pubmed FTP for daily updates. + +    * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles +    * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt + +    Daily Update Files (02/2020) +    ---------------------------- +    Each day, NLM produces update files that include new, revised and deleted +    citations. The first Update file to be loaded after loading the complete +    set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. + +    Usually, three files per update, e.g.: + +    * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016_stats.html +    * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz +    * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 + +    Currently (02/2020) the HTML contains the date. + +        <html> +        <head><title></title></head> +        <body> +        <h4>Filename: pubmed20n1019.xml -- Created: Wed Dec 18 14:31:09 EST 2019</h4> +        <table cellspacing="0" cellpadding="0" border="0" width="300"> +        <tr> + +    """ +    def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): +        self.name = 'Pubmed' +        self.host = 'ftp.ncbi.nlm.nih.gov' +        self.produce_topic = produce_topic +        self.state_topic = state_topic +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'message.max.bytes': 20000000,  # ~20 MBytes; broker is ~50 MBytes +        } +        self.loop_sleep = 60 * 60  # how long to wait, in seconds, between date checks +        self.state = HarvestState(start_date, end_date) +        self.state.initialize_from_kafka(self.state_topic, self.kafka_config) +        self.producer = self._kafka_producer() +        self.date_file_map = None + +    def _kafka_producer(self): +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err), file=sys.stderr) +                print("Bailing out...", file=sys.stderr) +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        self._kafka_fail_fast = fail_fast + +        producer_conf = self.kafka_config.copy() +        producer_conf.update({ +            'delivery.report.only.error': True, +            'default.topic.config': { +                'request.required.acks': -1,  # all brokers must confirm +            }, +        }) +        return Producer(producer_conf) + +    def fetch_date(self, date): +        """ +        Fetch file for a given date and feed Kafka one article per message. If +        the fetched XML does not contain a PMID, this method will fail. + +        If no date file mapping is found, this will fail. +        """ +        if self.date_file_map is None: +            raise ValueError("cannot fetch date without date file mapping") + +        date_str = date.strftime('%Y-%m-%d') +        paths = self.date_file_map.get(date_str) +        if paths is None: +            print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) +            return False + +        count = 0 +        for path in paths: +            # Fetch and decompress file. +            url = "ftp://{}{}".format(self.host, path) +            filename = ftpretr(url) +            with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: +                gzf = gzip.open(filename) +                shutil.copyfileobj(gzf, decomp) + +            # Here, blob is the unparsed XML; we peek into it to use PMID as +            # message key. We need streaming, since some updates would consume +            # GBs otherwise. +            # WARNING: Parsing foreign XML exposes us at some +            # https://docs.python.org/3/library/xml.html#xml-vulnerabilities +            # here. +            for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): +                soup = BeautifulSoup(blob, 'xml') +                pmid = soup.find('PMID') +                if pmid is None: +                    raise ValueError("no PMID found, please adjust identifier extraction") +                count += 1 +                if count % 50 == 0: +                    print("... up to {}".format(count), file=sys.stderr) +                self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + +            self.producer.flush() +            os.remove(filename) +            os.remove(decomp.name) + +        return True + +    def run(self, continuous=False): +        while True: +            self.date_file_map = generate_date_file_map(host=self.host) +            if len(self.date_file_map) == 0: +                raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + +            current = self.state.next(continuous) +            if current: +                print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr) +                self.fetch_date(current) +                self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) +                continue + +            if continuous: +                print("Sleeping {} seconds...".format(self.loop_sleep)) +                time.sleep(self.loop_sleep) +            else: +                break +        print("{} FTP ingest caught up".format(self.name)) + + +def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): +    """ +    Generate a DefaultDict[string, set] mapping dates to absolute filepaths on +    the server (mostly we have one file, but sometimes more). + +    Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...} +    """ +    mapping = collections.defaultdict(set) +    pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') +    ftp = FTP(host) +    ftp.login() +    filenames = ftp.nlst('/pubmed/updatefiles') + +    for name in filenames: +        if not name.endswith('.html'): +            continue +        sio = io.StringIO() +        ftp.retrlines('RETR {}'.format(name), sio.write) +        contents = sio.getvalue() +        match = pattern.search(contents) +        if match is None: +            print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) +            continue +        filename, filedate = match.groups()  # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') +        date = dateparser.parse(filedate) +        fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) +        date_str = date.strftime('%Y-%m-%d') +        mapping[date_str].add(fullpath) +        print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr) + +    print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) +    return mapping + + +def ftpretr(url): +    """ +    Note: This might move into a generic place in the future. + +    Fetch (RETR) a remote file given by its URL (e.g. +    "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a +    local temporary file. Returns the name of the local, closed temporary file. + +    It is the reponsibility of the caller to cleanup the temporary file. +    """ +    parsed = urlparse(url) +    server, path = parsed.netloc, parsed.path +    ftp = FTP(server) +    ftp.login() +    with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: +        print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr) +        ftp.retrbinary('RETR %s' % path, f.write) +    ftp.close() +    return f.name + + +def xmlstream(filename, tag, encoding='utf-8'): +    """ +    Note: This might move into a generic place in the future. + +    Given a path to an XML file and a tag name (without namespace), stream +    through the XML and yield elements denoted by tag as string. + +    for snippet in xmlstream("sample.xml", "sometag"): +        print(len(snippet)) + +    Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities +    """ +    def strip_ns(tag): +        if not '}' in tag: +            return tag +        return tag.split('}')[1] + +    # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm +    context = iter(ET.iterparse(filename, events=( +        'start', +        'end', +    ))) +    try: +        _, root = next(context) +    except StopIteration: +        return + +    for event, elem in context: +        if not strip_ns(elem.tag) == tag or event == 'start': +            continue + +        yield ET.tostring(elem, encoding=encoding) +        root.clear() diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 10557ef8..c26446fd 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:  """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC  from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug  from .datacite import DataciteImporter  from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 694ef359..da611ecb 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -730,6 +730,71 @@ class Bs4XmlFileListPusher(RecordPusher):          print(counts)          return counts +class KafkaBs4XmlPusher(RecordPusher): +    """ +    Fetch XML for an article from Kafka, parse via Bs4. +    """ +    def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): +        self.importer = importer +        self.consumer = make_kafka_consumer( +            kafka_hosts, +            kafka_env, +            topic_suffix, +            group, +            kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') +        ) +        self.poll_interval = kwargs.get('poll_interval', 5.0) +        self.consume_batch_size = kwargs.get('consume_batch_size', 25) + +    def run(self): +        count = 0 +        last_push = datetime.datetime.now() +        while True: +            # Note: this is batch-oriented, because underlying importer is +            # often batch-oriented, but this doesn't confirm that entire batch +            # has been pushed to fatcat before commiting offset. Eg, consider +            # case where there there is one update and thousands of creates; +            # update would be lingering in importer, and if importer crashed +            # never created. +            # This is partially mitigated for the worker case by flushing any +            # outstanding editgroups every 5 minutes, but there is still that +            # window when editgroups might be hanging (unsubmitted). +            batch = self.consumer.consume( +                num_messages=self.consume_batch_size, +                timeout=self.poll_interval) +            print("... got {} kafka messages ({}sec poll interval)".format( +                len(batch), self.poll_interval)) +            if not batch: +                if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): +                    # it has been some time, so flush any current editgroup +                    self.importer.finish() +                    last_push = datetime.datetime.now() +                    #print("Flushed any partial import batch: {}".format(self.importer.counts)) +                continue +            # first check errors on entire batch... +            for msg in batch: +                if msg.error(): +                    raise KafkaException(msg.error()) +            # ... then process +            for msg in batch: +                soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") +                self.importer.push_record(soup) +                soup.decompose() +                count += 1 +                if count % 500 == 0: +                    print("Import counts: {}".format(self.importer.counts)) +            last_push = datetime.datetime.now() +            for msg in batch: +                # locally store offsets of processed messages; will be +                # auto-commited by librdkafka from this "stored" value +                self.consumer.store_offsets(message=msg) + +        # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or +        # commit the current batch if it has been lingering +        counts = self.importer.finish() +        print(counts) +        self.consumer.close() +        return counts  class KafkaJsonPusher(RecordPusher): diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 18703a1a..9617299c 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -163,6 +163,14 @@ class CrossrefImporter(EntityImporter):              self.counts['skip-blank-title'] += 1              return False +        # these are pre-registered DOIs before the actual record is ready +        # title is a list of titles +        if obj.get('title')[0].strip().lower() in [ +                "OUP accepted manuscript".lower(), +            ]: +            self.counts['skip-stub-title'] += 1 +            return False +          # do most of these checks in-line below          return True diff --git a/python/fatcat_tools/importers/datacite.py b/python/fatcat_tools/importers/datacite.py index 9250fc5e..81f00876 100644 --- a/python/fatcat_tools/importers/datacite.py +++ b/python/fatcat_tools/importers/datacite.py @@ -222,6 +222,7 @@ class DataciteImporter(EntityImporter):          self.read_issn_map_file(issn_map_file)          self.debug = debug          self.insert_log_file = insert_log_file +        self.this_year = datetime.datetime.now().year          print('datacite with debug={}'.format(self.debug), file=sys.stderr) @@ -311,6 +312,12 @@ class DataciteImporter(EntityImporter):          release_date, release_month, release_year = parse_datacite_dates(              attributes.get('dates', [])) +        # block bogus far-future years/dates +        if release_year is not None and (release_year > (self.this_year + 5) or release_year < 1000): +            release_date = None +            release_month = None +            release_year = None +          # Some records do not use the "dates" field (e.g. micropub), but:          # "attributes.published" or "attributes.publicationYear"          if not any((release_date, release_month, release_year)): @@ -714,7 +721,8 @@ class DataciteImporter(EntityImporter):                      name_scheme = nid.get('nameIdentifierScheme', '') or ''                      if not name_scheme.lower() == "orcid":                          continue -                    orcid = nid.get('nameIdentifier', '').replace('https://orcid.org/', '') +                    orcid = nid.get('nameIdentifier') or '' +                    orcid = orcid.replace('https://orcid.org/', '')                      if not orcid:                          continue                      creator_id = self.lookup_orcid(orcid) diff --git a/python/fatcat_tools/importers/jalc.py b/python/fatcat_tools/importers/jalc.py index a0e0086b..351a20a3 100644 --- a/python/fatcat_tools/importers/jalc.py +++ b/python/fatcat_tools/importers/jalc.py @@ -209,10 +209,14 @@ class JalcImporter(EntityImporter):                  release_year = int(date)          pages = None -        if record.startingPage: -            pages = record.startingPage.string -            if record.endingPage: -                pages = "{}-{}".format(pages, record.endingPage.string) +        if record.startingPage and record.startingPage.string.strip(): +            pages = record.startingPage.string.strip() +            if record.endingPage and record.endingPage.string.strip(): +                pages = "{}-{}".format(pages, record.endingPage.string.strip()) +        # double check to prevent "-" as pages +        if pages and pages.strip() == '-': +            pages = None +          volume = None          if record.volume:              volume = record.volume.string diff --git a/python/fatcat_tools/importers/pubmed.py b/python/fatcat_tools/importers/pubmed.py index c32ce34a..3ecf5ef4 100644 --- a/python/fatcat_tools/importers/pubmed.py +++ b/python/fatcat_tools/importers/pubmed.py @@ -616,7 +616,10 @@ class PubmedImporter(EntityImporter):          ### References          refs = []          if pubmed.ReferenceList: -            for ref in pubmed.ReferenceList.find_all('Reference'): +            # note that Reference always exists within a ReferenceList, but +            # that there may be multiple ReferenceList (eg, sometimes one per +            # Reference) +            for ref in pubmed.find_all('Reference'):                  ref_extra = dict()                  ref_doi = ref.find("ArticleId", IdType="doi")                  if ref_doi: @@ -729,8 +732,29 @@ class PubmedImporter(EntityImporter):              existing.ext_ids.doi = existing.ext_ids.doi or re.ext_ids.doi              existing.ext_ids.pmid = existing.ext_ids.pmid or re.ext_ids.pmid              existing.ext_ids.pmcid = existing.ext_ids.pmcid or re.ext_ids.pmcid + +            existing.container_id = existing.container_id or re.container_id              existing.refs = existing.refs or re.refs +            existing.abstracts = existing.abstracts or re.abstracts              existing.extra['pubmed'] = re.extra['pubmed'] + +            # fix stub titles +            if existing.title in [ +                    "OUP accepted manuscript", +                ]: +                existing.title = re.title + +            existing.original_title = existing.original_title or re.original_title +            existing.release_type = existing.release_type or re.release_type +            existing.release_stage = existing.release_stage or re.release_stage +            existing.release_date = existing.release_date or re.release_date +            existing.release_year = existing.release_year or re.release_year +            existing.withdrawn_status = existing.withdrawn_status or re.withdrawn_status +            existing.volume = existing.volume or re.volume +            existing.issue = existing.issue or re.issue +            existing.pages = existing.pages or re.pages +            existing.language = existing.language or re.language +              # update subtitle in-place first              if not existing.subtitle and existing.extra.get('subtitle'):                  subtitle = existing.extra.pop('subtitle') @@ -740,6 +764,7 @@ class PubmedImporter(EntityImporter):                      existing.subtitle = subtitle              if not existing.subtitle:                  existing.subtitle = re.subtitle +              try:                  self.api.update_release(self.get_editgroup_id(), existing.ident, existing)                  self.counts['update'] += 1 diff --git a/python/fatcat_tools/transforms/csl.py b/python/fatcat_tools/transforms/csl.py index 7ab94cac..832ad6aa 100644 --- a/python/fatcat_tools/transforms/csl.py +++ b/python/fatcat_tools/transforms/csl.py @@ -37,8 +37,9 @@ def release_to_csl(entity):              # Default to "local" (publication-specific) metadata; fall back to              # creator-level              family = contrib.surname or contrib.creator.surname or (contrib.raw_name and contrib.raw_name.split()[-1]) -            if not contrib.raw_name: -                raise ValueError("CSL requires some surname (family name)") +            if not family: +                # CSL requires some surname (family name) +                continue              c = dict(                  family=family,                  given=contrib.given_name or contrib.creator.given_name, @@ -49,22 +50,27 @@ def release_to_csl(entity):                  #static-ordering                  literal=contrib.raw_name or contrib.creator.display_name,                  #parse-names, -                role=contrib.role, +                # role must be defined; default to author +                role=contrib.role or 'author',              )          else:              family = contrib.surname or (contrib.raw_name and contrib.raw_name.split()[-1]) -            if not contrib.raw_name: -                raise ValueError("CSL requires some surname (family name)") +            if not family: +                # CSL requires some surname (family name) +                continue              c = dict(                  family=family,                  given=contrib.given_name,                  literal=contrib.raw_name, -                role=contrib.role, +                # role must be defined; default to author +                role=contrib.role or 'author',              )          for k in list(c.keys()):              if not c[k]:                  c.pop(k)          contribs.append(c) +    if not contribs: +        raise ValueError("citeproc requires at least one author with a surname")      abstract = None      if entity.abstracts:          abstract = entity.abstracts[0].content diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index b84d5e70..5783bbfc 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -107,6 +107,9 @@ class EntityUpdatesWorker(FatcatWorker):              "10.1101/",              # researchgate              "10.13140/", +            # the lancet (often hybrid OA) +            "10.1016/s0140-6736", +            "10.1016/s2213-2600",          ]      def want_live_ingest(self, release, ingest_request): | 
