diff options
Diffstat (limited to 'python')
| -rwxr-xr-x | python/fatcat_harvest.py | 15 | ||||
| -rwxr-xr-x | python/fatcat_import.py | 32 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 15 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 199 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 65 | 
6 files changed, 307 insertions, 21 deletions
| diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py index 58bef9ca..4c4f34a1 100755 --- a/python/fatcat_harvest.py +++ b/python/fatcat_harvest.py @@ -6,7 +6,7 @@ import datetime  import raven  from fatcat_tools.harvest import HarvestCrossrefWorker, HarvestDataciteWorker,\      HarvestArxivWorker, HarvestPubmedWorker, HarvestDoajArticleWorker,\ -    HarvestDoajJournalWorker +    HarvestDoajJournalWorker, PubmedFTPWorker  # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable  sentry_client = raven.Client() @@ -42,10 +42,17 @@ def run_arxiv(args):      worker.run(continuous=args.continuous)  def run_pubmed(args): -    worker = HarvestPubmedWorker( +    # worker = HarvestPubmedWorker( +    #     kafka_hosts=args.kafka_hosts, +    #     produce_topic="fatcat-{}.oaipmh-pubmed".format(args.env), +    #     state_topic="fatcat-{}.oaipmh-pubmed-state".format(args.env), +    #     start_date=args.start_date, +    #     end_date=args.end_date) +    # worker.run(continuous=args.continuous) +    worker = PubmedFTPWorker(          kafka_hosts=args.kafka_hosts, -        produce_topic="fatcat-{}.oaipmh-pubmed".format(args.env), -        state_topic="fatcat-{}.oaipmh-pubmed-state".format(args.env), +        produce_topic="fatcat-{}.ftp-pubmed".format(args.env), +        state_topic="fatcat-{}.ftp-pubmed-state".format(args.env),          start_date=args.start_date,          end_date=args.end_date)      worker.run(continuous=args.continuous) diff --git a/python/fatcat_import.py b/python/fatcat_import.py index ad4de0e2..eaab9cfe 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -39,14 +39,13 @@ def run_arxiv(args):      ari = ArxivRawImporter(args.api,          edit_batch_size=args.batch_size)      if args.kafka_mode: -        raise NotImplementedError -        #KafkaBs4XmlPusher( -        #    ari, -        #    args.kafka_hosts, -        #    args.kafka_env, -        #    "api-arxiv", -        #    "fatcat-{}-import-arxiv".format(args.kafka_env), -        #).run() +        KafkaBs4XmlPusher( +            ari, +            args.kafka_hosts, +            args.kafka_env, +            "oaipmh-arxiv", +            "fatcat-{}-import-arxiv".format(args.kafka_env), +        ).run()      else:          Bs4XmlFilePusher(ari, args.xml_file, "record").run() @@ -57,14 +56,13 @@ def run_pubmed(args):          do_updates=args.do_updates,          lookup_refs=(not args.no_lookup_refs))      if args.kafka_mode: -        raise NotImplementedError -        #KafkaBs4XmlPusher( -        #    pi, -        #    args.kafka_hosts, -        #    args.kafka_env, -        #    "api-pubmed", -        #    "fatcat-{}import-arxiv".format(args.kafka_env), -        #).run() +        KafkaBs4XmlPusher( +            pi, +            args.kafka_hosts, +            args.kafka_env, +            "oaipmh-pubmed", +            "fatcat-{}-import-pubmed".format(args.kafka_env), +        ).run()      else:          Bs4XmlLargeFilePusher(              pi, @@ -297,6 +295,7 @@ def main():          auth_var="FATCAT_AUTH_WORKER_ARXIV",      )      sub_arxiv.add_argument('xml_file', +        nargs='?',          help="arXivRaw XML file to import from",          default=sys.stdin, type=argparse.FileType('r'))      sub_arxiv.add_argument('--kafka-mode', @@ -310,6 +309,7 @@ def main():          auth_var="FATCAT_AUTH_WORKER_PUBMED",      )      sub_pubmed.add_argument('xml_file', +        nargs='?',          help="Pubmed XML file to import from",          default=sys.stdin, type=argparse.FileType('r'))      sub_pubmed.add_argument('issn_map_file', diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 11b5fa0a..8e9efea8 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -142,6 +142,21 @@ class HarvestPubmedWorker(HarvestOaiPmhWorker):      - https://www.ncbi.nlm.nih.gov/pmc/tools/oai/      - https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm      - https://github.com/titipata/pubmed_parser + +    TODO(martin): OAI does not seem to support the format we already have an +    importer for. Maybe we can use "Daily Update Files" -- + +    Daily Update Files +    ------------------ +    Each day, NLM produces update files that include new, revised and deleted +    citations. The first Update file to be loaded after loading the complete +    set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. +    ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles + +    NOTES: + +    * OAI: https://dtd.nlm.nih.gov/archiving/2.3/xsd/archivearticle.xsd +    * FTP: https://dtd.nlm.nih.gov/ncbi/pubmed/out/pubmed_190101.dtd      """      def __init__(self, **kwargs): diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py new file mode 100644 index 00000000..da872a10 --- /dev/null +++ b/python/fatcat_tools/harvest/pubmed.py @@ -0,0 +1,199 @@ +""" +Pubmed harvest via FTP. +""" + +import collections +import io +import re +import sys +import tempfile +import xml.etree.ElementTree as ET +from ftplib import FTP +from urllib.parse import urljoin, urlparse + +import dateparser +from bs4 import BeautifulSoup +from confluent_kafka import KafkaException, Producer + +from .harvest_common import HarvestState + + +class PubmedFTPWorker: +    """ +    Access Pubmed FTP host for daily updates. + +    * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles +    * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt + +    Daily Update Files (02/2020) +    ---------------------------- +    Each day, NLM produces update files that include new, revised and deleted +    citations. The first Update file to be loaded after loading the complete +    set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml. + +    Usually, three files per update, e.g.: + +    * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016_stats.html +    * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz +    * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5 + +    The HTML contains the date. + +        <html> +        <head><title></title></head> +        <body> +        <h4>Filename: pubmed20n1019.xml -- Created: Wed Dec 18 14:31:09 EST 2019</h4> +        <table cellspacing="0" cellpadding="0" border="0" width="300"> +        <tr> + +    When this workers starts, it will figure out a mapping from date to XML +    files by looking at all the HTML files. +    """ +    def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_data=None): +        self.host = 'ftp.ncbi.nlm.nih.gov' +        self.produce_topic = produce_topic +        self.state_topic = state_topic +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes +        } +        self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks +        self.state = HarvestState(start_date, end_date) +        self.state.initialize_from_kafka(self.state_topic, self.kafka_config) +        self.date_file_map = self.generate_date_file_map() +        if len(self.date_file_map) == 0: +            raise ValueError('mapping from dates to files should not be empty') + +    def generate_date_file_map(self): +        """ +        Generate a dictionary mapping date (strings) to filepaths. The date is +        parsed from pubmed20n1016_stats.html, mapping to absolute path on FTP, +        e.g. "2020-01-02": "/pubmed/updatefiles/pubmed20n1016.xml.gz". +        """ +	mapping = collections.defaultdict(set) +        pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') +        ftp = FTP(self.host) +        ftp.login() +        filenames = ftp.nlst('/pubmed/updatefiles') +	for name in filenames: +	    if not name.endswith('.html'): +		continue +	    sio = io.StringIO() +	    ftp.retrlines('RETR {}'.format(name), sio.write) +	    contents = sio.getvalue() +	    match = pattern.search(contents) +	    if match is None: +                print('pattern miss on: {}, may need to adjust pattern: {}'.format(contents, pattern), +                      file=sys.stderr) +		continue +	    filename, filedate = match.groups()  # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') +	    date = dateparser.parse(filedate) +	    fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) +	    mapping[date.format('%Y-%m-%d')].add(fullpath) + +        self.date_file_map = mapping +        print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + + +    def fetch_date(self, date): +        """ +        Fetch file for a given date and feed Kafka one article per message. +        """ +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err), file=sys.stderr) +                print("Bailing out...", file=sys.stderr) +                raise KafkaException(err) + +        producer_conf = self.kafka_config.copy() +        producer_conf.update({ +            'delivery.report.only.error': True, +            'default.topic.config': { +                'request.required.acks': -1, # all brokers must confirm +            }, +        }) +        producer = Producer(producer_conf) + +        date_str = date.format('%Y-%m-%d') +        paths = self.date_file_map.get(date_str) +        if paths is None: +            print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) +            return + +        count = 0 +        for path in paths: +            filename = ftpretr(urljoin(self.host, path)) +            for blob in xmlstream(filename, 'PubmedArticle', encoding='utf-8'): +                soup = BeautifulSoup(blob) +                pmid = soup.find('PMID') +                if pmid is None: +                    raise ValueError('no PMID found, adjust identifier extraction') +                count += 1 +                if count % 50 == 0: +                    print("... up to {} from {}".format(count, filename)) +                producer.produce( +                    self.produce_topic, +                    blob, +                    key=pmid.text, +                    on_delivery=fail_fast) +            producer.flush() + +    def run(self, continuous=False): +        while True: +            current = self.state.next(continuous) +            if current: +                print("Fetching DOIs updated on {} (UTC)".format(current)) +                self.fetch_date(current) +                self.state.complete(current, +                    kafka_topic=self.state_topic, +                    kafka_config=self.kafka_config) +                continue + +            if continuous: +                print("Sleeping {} seconds...".format(self.loop_sleep)) +                time.sleep(self.loop_sleep) +            else: +                break +        print("{} DOI ingest caught up".format(self.name)) + + +class ftpretr(uri): +    """ +    Fetch (RETR) a remote file to a local temporary file. +    """ +    parsed = urlparse(uri) +    server, path = parsed.netloc, parsed.path +    ftp = FTP(self.server) +    ftp.login() +    with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: +        ftp.retrbinary('RETR %s' % path, f.write) +    ftp.close() +    return f.name + + +def xmlstream(filename, tag, encoding='utf-8'): +    """ +    Given a path to an XML file and a tag name (without namespace), stream +    through the XML, and emit the element denoted by tag for processing as string. + +    for snippet in xmlstream("sample.xml", "sometag"): +	print(len(snippet)) +    """ +    def strip_ns(tag): +        if not '}' in tag: +            return tag +        return tag.split('}')[1] + +    # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm +    context = iter(ET.iterparse(filename, events=( +        'start', +        'end', +    ))) +    _, root = next(context) + +    for event, elem in context: +        if not strip_ns(elem.tag) == tag or event == 'start': +            continue + +        yield ET.tostring(elem, encoding=encoding) +        root.clear() diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index d936605f..03c7cbcc 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:  """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC  from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug  from .datacite import DataciteImporter  from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 1ffbd6e7..1cb5529e 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -721,6 +721,71 @@ class Bs4XmlFileListPusher(RecordPusher):          print(counts)          return counts +class KafkaBs4XmlPusher(RecordPusher): +    """ +    Fetch XML for an article from Kafka, parse via Bs4. +    """ +    def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): +        self.importer = importer +        self.consumer = make_kafka_consumer( +            kafka_hosts, +            kafka_env, +            topic_suffix, +            group, +            kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') +        ) +        self.poll_interval = kwargs.get('poll_interval', 5.0) +        self.consume_batch_size = kwargs.get('consume_batch_size', 100) + +    def run(self): +        count = 0 +        last_push = datetime.datetime.now() +        while True: +            # Note: this is batch-oriented, because underlying importer is +            # often batch-oriented, but this doesn't confirm that entire batch +            # has been pushed to fatcat before commiting offset. Eg, consider +            # case where there there is one update and thousands of creates; +            # update would be lingering in importer, and if importer crashed +            # never created. +            # This is partially mitigated for the worker case by flushing any +            # outstanding editgroups every 5 minutes, but there is still that +            # window when editgroups might be hanging (unsubmitted). +            batch = self.consumer.consume( +                num_messages=self.consume_batch_size, +                timeout=self.poll_interval) +            print("... got {} kafka messages ({}sec poll interval)".format( +                len(batch), self.poll_interval)) +            if not batch: +                if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): +                    # it has been some time, so flush any current editgroup +                    self.importer.finish() +                    last_push = datetime.datetime.now() +                    #print("Flushed any partial import batch: {}".format(self.importer.counts)) +                continue +            # first check errors on entire batch... +            for msg in batch: +                if msg.error(): +                    raise KafkaException(msg.error()) +            # ... then process +            for msg in batch: +                soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") +                self.importer.push_record(soup) +                soup.decompose() +                count += 1 +                if count % 500 == 0: +                    print("Import counts: {}".format(self.importer.counts)) +            last_push = datetime.datetime.now() +            for msg in batch: +                # locally store offsets of processed messages; will be +                # auto-commited by librdkafka from this "stored" value +                self.consumer.store_offsets(message=msg) + +        # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or +        # commit the current batch if it has been lingering +        counts = self.importer.finish() +        print(counts) +        self.consumer.close() +        return counts  class KafkaJsonPusher(RecordPusher): | 
