summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Czygan <martin@archive.org>2020-03-10 15:33:17 +0000
committerMartin Czygan <martin@archive.org>2020-03-10 15:33:17 +0000
commit336630e1d445fb9d233447f9af4bac94473a12bf (patch)
treeb2d4baa4ea6d3afac29b9b2760101c10d18ea30a
parentf4cce5a765a9f80f9c5e9c907689c06dc9ebf102 (diff)
parentd18942d1ab4d394bdb275bcf9eb82d1cba814775 (diff)
downloadfatcat-336630e1d445fb9d233447f9af4bac94473a12bf.tar.gz
fatcat-336630e1d445fb9d233447f9af4bac94473a12bf.zip
Merge branch 'martin-kafka-bs4-import' into 'master'
pubmed and arxiv harvest preparations See merge request webgroup/fatcat!28
-rwxr-xr-xpython/fatcat_harvest.py17
-rwxr-xr-xpython/fatcat_import.py34
-rw-r--r--python/fatcat_tools/harvest/__init__.py5
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py19
-rw-r--r--python/fatcat_tools/harvest/pubmed.py249
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/common.py65
-rw-r--r--python/tests/files/pubmedsample_2019.xml.gzbin0 -> 218528 bytes
-rw-r--r--python/tests/files/pubmedsample_no_pmid_2019.xml.gzbin0 -> 1128 bytes
-rw-r--r--python/tests/harvest_pubmed.py80
10 files changed, 428 insertions, 43 deletions
diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py
index 58bef9ca..7ac0f16c 100755
--- a/python/fatcat_harvest.py
+++ b/python/fatcat_harvest.py
@@ -5,8 +5,8 @@ import argparse
import datetime
import raven
from fatcat_tools.harvest import HarvestCrossrefWorker, HarvestDataciteWorker,\
- HarvestArxivWorker, HarvestPubmedWorker, HarvestDoajArticleWorker,\
- HarvestDoajJournalWorker
+ HarvestArxivWorker, HarvestDoajArticleWorker, HarvestDoajJournalWorker,\
+ PubmedFTPWorker
# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
sentry_client = raven.Client()
@@ -42,10 +42,17 @@ def run_arxiv(args):
worker.run(continuous=args.continuous)
def run_pubmed(args):
- worker = HarvestPubmedWorker(
+ # worker = HarvestPubmedWorker(
+ # kafka_hosts=args.kafka_hosts,
+ # produce_topic="fatcat-{}.oaipmh-pubmed".format(args.env),
+ # state_topic="fatcat-{}.oaipmh-pubmed-state".format(args.env),
+ # start_date=args.start_date,
+ # end_date=args.end_date)
+ # worker.run(continuous=args.continuous)
+ worker = PubmedFTPWorker(
kafka_hosts=args.kafka_hosts,
- produce_topic="fatcat-{}.oaipmh-pubmed".format(args.env),
- state_topic="fatcat-{}.oaipmh-pubmed-state".format(args.env),
+ produce_topic="fatcat-{}.ftp-pubmed".format(args.env),
+ state_topic="fatcat-{}.ftp-pubmed-state".format(args.env),
start_date=args.start_date,
end_date=args.end_date)
worker.run(continuous=args.continuous)
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index 843685aa..e1e06653 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -39,15 +39,16 @@ def run_arxiv(args):
ari = ArxivRawImporter(args.api,
edit_batch_size=args.batch_size)
if args.kafka_mode:
- raise NotImplementedError
- #KafkaBs4XmlPusher(
- # ari,
- # args.kafka_hosts,
- # args.kafka_env,
- # "api-arxiv",
- # "fatcat-{}-import-arxiv".format(args.kafka_env),
- #).run()
+ KafkaBs4XmlPusher(
+ ari,
+ args.kafka_hosts,
+ args.kafka_env,
+ "oaipmh-arxiv",
+ "fatcat-{}-import-arxiv".format(args.kafka_env),
+ ).run()
else:
+ if args.xml_file == sys.stdin:
+ print('note: reading from stdin', file=sys.stderr)
Bs4XmlFilePusher(ari, args.xml_file, "record").run()
def run_pubmed(args):
@@ -57,14 +58,13 @@ def run_pubmed(args):
do_updates=args.do_updates,
lookup_refs=(not args.no_lookup_refs))
if args.kafka_mode:
- raise NotImplementedError
- #KafkaBs4XmlPusher(
- # pi,
- # args.kafka_hosts,
- # args.kafka_env,
- # "api-pubmed",
- # "fatcat-{}import-arxiv".format(args.kafka_env),
- #).run()
+ KafkaBs4XmlPusher(
+ pi,
+ args.kafka_hosts,
+ args.kafka_env,
+ "ftp-pubmed",
+ "fatcat-{}-import-pubmed".format(args.kafka_env),
+ ).run()
else:
Bs4XmlLargeFilePusher(
pi,
@@ -302,6 +302,7 @@ def main():
auth_var="FATCAT_AUTH_WORKER_ARXIV",
)
sub_arxiv.add_argument('xml_file',
+ nargs='?',
help="arXivRaw XML file to import from",
default=sys.stdin, type=argparse.FileType('r'))
sub_arxiv.add_argument('--kafka-mode',
@@ -315,6 +316,7 @@ def main():
auth_var="FATCAT_AUTH_WORKER_PUBMED",
)
sub_pubmed.add_argument('xml_file',
+ nargs='?',
help="Pubmed XML file to import from",
default=sys.stdin, type=argparse.FileType('r'))
sub_pubmed.add_argument('issn_map_file',
diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py
index 7d814696..b3757a7d 100644
--- a/python/fatcat_tools/harvest/__init__.py
+++ b/python/fatcat_tools/harvest/__init__.py
@@ -1,5 +1,6 @@
from .harvest_common import HarvestState
from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker
-from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\
- HarvestDoajArticleWorker, HarvestDoajJournalWorker
+from .oaipmh import HarvestArxivWorker, HarvestDoajArticleWorker, \
+ HarvestDoajJournalWorker
+from .pubmed import PubmedFTPWorker
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 11b5fa0a..c95f3445 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -132,25 +132,6 @@ class HarvestArxivWorker(HarvestOaiPmhWorker):
self.name = "arxiv"
-class HarvestPubmedWorker(HarvestOaiPmhWorker):
- """
- Will likely be doing MEDLINE daily batch imports for primary metadata, but
- might also want to run a PMC importer to update fulltext and assign OA
- licenses (when appropriate).
-
- Pubmed refs:
- - https://www.ncbi.nlm.nih.gov/pmc/tools/oai/
- - https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm
- - https://github.com/titipata/pubmed_parser
- """
-
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.endpoint_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi"
- self.metadata_prefix = "pmc_fm"
- self.name = "pubmed"
-
-
class HarvestDoajJournalWorker(HarvestOaiPmhWorker):
"""
WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py
new file mode 100644
index 00000000..3f31696e
--- /dev/null
+++ b/python/fatcat_tools/harvest/pubmed.py
@@ -0,0 +1,249 @@
+"""
+Pubmed harvest via FTP.
+
+Assumptions:
+
+* fixed hostname and directory structure
+* XML files are gzip compressed
+* accompanying HTML files contain correct dates
+"""
+
+import collections
+import gzip
+import io
+import os
+import re
+import shutil
+import sys
+import tempfile
+import time
+import xml.etree.ElementTree as ET
+from ftplib import FTP
+from urllib.parse import urljoin, urlparse
+
+import dateparser
+from bs4 import BeautifulSoup
+from confluent_kafka import KafkaException, Producer
+
+from .harvest_common import HarvestState
+
+
+class PubmedFTPWorker:
+ """
+ Access Pubmed FTP for daily updates.
+
+ * Server directory: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles
+ * Docs: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/README.txt
+
+ Daily Update Files (02/2020)
+ ----------------------------
+ Each day, NLM produces update files that include new, revised and deleted
+ citations. The first Update file to be loaded after loading the complete
+ set of 2019 MEDLINE/PubMed Baseline files is pubmed20n1016.xml.
+
+ Usually, three files per update, e.g.:
+
+ * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016_stats.html
+ * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz
+ * ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz.md5
+
+ Currently (02/2020) the HTML contains the date.
+
+ <html>
+ <head><title></title></head>
+ <body>
+ <h4>Filename: pubmed20n1019.xml -- Created: Wed Dec 18 14:31:09 EST 2019</h4>
+ <table cellspacing="0" cellpadding="0" border="0" width="300">
+ <tr>
+
+ """
+ def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
+ self.name = 'Pubmed'
+ self.host = 'ftp.ncbi.nlm.nih.gov'
+ self.produce_topic = produce_topic
+ self.state_topic = state_topic
+ self.kafka_config = {
+ 'bootstrap.servers': kafka_hosts,
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ }
+ self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
+ self.state = HarvestState(start_date, end_date)
+ self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
+ self.producer = self._kafka_producer()
+ self.date_file_map = None
+
+ def _kafka_producer(self):
+ def fail_fast(err, msg):
+ if err is not None:
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+
+ self._kafka_fail_fast = fail_fast
+
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ return Producer(producer_conf)
+
+ def fetch_date(self, date):
+ """
+ Fetch file for a given date and feed Kafka one article per message. If
+ the fetched XML does not contain a PMID, this method will fail.
+
+ If no date file mapping is found, this will fail.
+ """
+ if self.date_file_map is None:
+ raise ValueError("cannot fetch date without date file mapping")
+
+ date_str = date.strftime('%Y-%m-%d')
+ paths = self.date_file_map.get(date_str)
+ if paths is None:
+ print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr)
+ return False
+
+ count = 0
+ for path in paths:
+ # Fetch and decompress file.
+ url = "ftp://{}{}".format(self.host, path)
+ filename = ftpretr(url)
+ with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp:
+ gzf = gzip.open(filename)
+ shutil.copyfileobj(gzf, decomp)
+
+ # Here, blob is the unparsed XML; we peek into it to use PMID as
+ # message key. We need streaming, since some updates would consume
+ # GBs otherwise.
+ # WARNING: Parsing foreign XML exposes us at some
+ # https://docs.python.org/3/library/xml.html#xml-vulnerabilities
+ # here.
+ for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'):
+ soup = BeautifulSoup(blob, 'xml')
+ pmid = soup.find('PMID')
+ if pmid is None:
+ raise ValueError("no PMID found, please adjust identifier extraction")
+ count += 1
+ if count % 50 == 0:
+ print("... up to {}".format(count), file=sys.stderr)
+ self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast)
+
+ self.producer.flush()
+ os.remove(filename)
+ os.remove(decomp.name)
+
+ return True
+
+ def run(self, continuous=False):
+ while True:
+ self.date_file_map = generate_date_file_map(host=self.host)
+ if len(self.date_file_map) == 0:
+ raise ValueError("map from dates to files should not be empty, maybe the HTML changed?")
+
+ current = self.state.next(continuous)
+ if current:
+ print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr)
+ self.fetch_date(current)
+ self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config)
+ continue
+
+ if continuous:
+ print("Sleeping {} seconds...".format(self.loop_sleep))
+ time.sleep(self.loop_sleep)
+ else:
+ break
+ print("{} FTP ingest caught up".format(self.name))
+
+
+def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
+ """
+ Generate a DefaultDict[string, set] mapping dates to absolute filepaths on
+ the server (mostly we have one file, but sometimes more).
+
+ Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...}
+ """
+ mapping = collections.defaultdict(set)
+ pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)')
+ ftp = FTP(host)
+ ftp.login()
+ filenames = ftp.nlst('/pubmed/updatefiles')
+
+ for name in filenames:
+ if not name.endswith('.html'):
+ continue
+ sio = io.StringIO()
+ ftp.retrlines('RETR {}'.format(name), sio.write)
+ contents = sio.getvalue()
+ match = pattern.search(contents)
+ if match is None:
+ print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr)
+ continue
+ filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
+ date = dateparser.parse(filedate)
+ fullpath = '/pubmed/updatefiles/{}.gz'.format(filename)
+ date_str = date.strftime('%Y-%m-%d')
+ mapping[date_str].add(fullpath)
+ print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr)
+
+ print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr)
+ return mapping
+
+
+def ftpretr(url):
+ """
+ Note: This might move into a generic place in the future.
+
+ Fetch (RETR) a remote file given by its URL (e.g.
+ "ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/pubmed20n1016.xml.gz") to a
+ local temporary file. Returns the name of the local, closed temporary file.
+
+ It is the reponsibility of the caller to cleanup the temporary file.
+ """
+ parsed = urlparse(url)
+ server, path = parsed.netloc, parsed.path
+ ftp = FTP(server)
+ ftp.login()
+ with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f:
+ print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr)
+ ftp.retrbinary('RETR %s' % path, f.write)
+ ftp.close()
+ return f.name
+
+
+def xmlstream(filename, tag, encoding='utf-8'):
+ """
+ Note: This might move into a generic place in the future.
+
+ Given a path to an XML file and a tag name (without namespace), stream
+ through the XML and yield elements denoted by tag as string.
+
+ for snippet in xmlstream("sample.xml", "sometag"):
+ print(len(snippet))
+
+ Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities
+ """
+ def strip_ns(tag):
+ if not '}' in tag:
+ return tag
+ return tag.split('}')[1]
+
+ # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm
+ context = iter(ET.iterparse(filename, events=(
+ 'start',
+ 'end',
+ )))
+ try:
+ _, root = next(context)
+ except StopIteration:
+ return
+
+ for event, elem in context:
+ if not strip_ns(elem.tag) == tag or event == 'start':
+ continue
+
+ yield ET.tostring(elem, encoding=encoding)
+ root.clear()
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 10557ef8..c26446fd 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:
"""
-from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC
+from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC
from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug
from .datacite import DataciteImporter
from .jalc import JalcImporter
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index a84ce90f..c000ad62 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -730,6 +730,71 @@ class Bs4XmlFileListPusher(RecordPusher):
print(counts)
return counts
+class KafkaBs4XmlPusher(RecordPusher):
+ """
+ Fetch XML for an article from Kafka, parse via Bs4.
+ """
+ def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
+ self.importer = importer
+ self.consumer = make_kafka_consumer(
+ kafka_hosts,
+ kafka_env,
+ topic_suffix,
+ group,
+ kafka_namespace=kwargs.get('kafka_namespace', 'fatcat')
+ )
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.consume_batch_size = kwargs.get('consume_batch_size', 25)
+
+ def run(self):
+ count = 0
+ last_push = datetime.datetime.now()
+ while True:
+ # Note: this is batch-oriented, because underlying importer is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in importer, and if importer crashed
+ # never created.
+ # This is partially mitigated for the worker case by flushing any
+ # outstanding editgroups every 5 minutes, but there is still that
+ # window when editgroups might be hanging (unsubmitted).
+ batch = self.consumer.consume(
+ num_messages=self.consume_batch_size,
+ timeout=self.poll_interval)
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval))
+ if not batch:
+ if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
+ # it has been some time, so flush any current editgroup
+ self.importer.finish()
+ last_push = datetime.datetime.now()
+ #print("Flushed any partial import batch: {}".format(self.importer.counts))
+ continue
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ for msg in batch:
+ soup = BeautifulSoup(msg.value().decode('utf-8'), "xml")
+ self.importer.push_record(soup)
+ soup.decompose()
+ count += 1
+ if count % 500 == 0:
+ print("Import counts: {}".format(self.importer.counts))
+ last_push = datetime.datetime.now()
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
+ # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
+ # commit the current batch if it has been lingering
+ counts = self.importer.finish()
+ print(counts)
+ self.consumer.close()
+ return counts
class KafkaJsonPusher(RecordPusher):
diff --git a/python/tests/files/pubmedsample_2019.xml.gz b/python/tests/files/pubmedsample_2019.xml.gz
new file mode 100644
index 00000000..bafad833
--- /dev/null
+++ b/python/tests/files/pubmedsample_2019.xml.gz
Binary files differ
diff --git a/python/tests/files/pubmedsample_no_pmid_2019.xml.gz b/python/tests/files/pubmedsample_no_pmid_2019.xml.gz
new file mode 100644
index 00000000..8785a06d
--- /dev/null
+++ b/python/tests/files/pubmedsample_no_pmid_2019.xml.gz
Binary files differ
diff --git a/python/tests/harvest_pubmed.py b/python/tests/harvest_pubmed.py
new file mode 100644
index 00000000..f8db46b6
--- /dev/null
+++ b/python/tests/harvest_pubmed.py
@@ -0,0 +1,80 @@
+"""
+Test pubmed FTP harvest.
+"""
+
+import datetime
+import json
+import os
+
+import pytest
+
+from fatcat_tools.harvest import *
+from fatcat_tools.harvest.pubmed import generate_date_file_map
+
+
+def test_pubmed_harvest_date(mocker):
+
+ # mock out the harvest state object so it doesn't try to actually connect
+ # to Kafka
+ mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka')
+
+ # Mocking a file fetched from FTP, should contain some 'PubmedArticle' elements.
+ # $ zcat tests/files/pubmedsample_2019.xml.gz | grep -c '<PubmedArticle>'
+ # 176
+ file_to_retrieve = os.path.join(os.path.dirname(__file__), 'files/pubmedsample_2019.xml.gz')
+ ftpretr = mocker.patch('fatcat_tools.harvest.pubmed.ftpretr')
+ ftpretr.return_value = file_to_retrieve
+
+ test_date = '2020-02-20'
+
+ # We'll need one entry in the date_file_map.
+ generate_date_file_map = mocker.patch('fatcat_tools.harvest.pubmed.generate_date_file_map')
+ generate_date_file_map.return_value = {test_date: set(['dummy'])}
+
+ # For cleanup.
+ os.remove = mocker.Mock()
+
+ harvester = PubmedFTPWorker(
+ kafka_hosts="dummy",
+ produce_topic="dummy-produce-topic",
+ state_topic="dummy-state-topic",
+ )
+
+ harvester.producer = mocker.Mock()
+ harvester.date_file_map = generate_date_file_map()
+ # Since we mock out the FTP fetch, the concrete date does not matter here.
+ harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d'))
+
+ # check that we published the expected number of DOI objects were published
+ # to the (mock) kafka topic
+ assert harvester.producer.produce.call_count == 176
+ assert harvester.producer.flush.call_count == 1
+ assert os.remove.call_count == 2
+
+def test_pubmed_harvest_date_no_pmid(mocker):
+ # mock out the harvest state object so it doesn't try to actually connect
+ # to Kafka
+ mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka')
+
+ file_to_retrieve = os.path.join(os.path.dirname(__file__), 'files/pubmedsample_no_pmid_2019.xml.gz')
+ ftpretr = mocker.patch('fatcat_tools.harvest.pubmed.ftpretr')
+ ftpretr.return_value = file_to_retrieve
+
+ test_date = '2020-02-20'
+
+ # We'll need one entry in the date_file_map.
+ generate_date_file_map = mocker.patch('fatcat_tools.harvest.pubmed.generate_date_file_map')
+ generate_date_file_map.return_value = {test_date: set(['dummy'])}
+
+ harvester = PubmedFTPWorker(
+ kafka_hosts="dummy",
+ produce_topic="dummy-produce-topic",
+ state_topic="dummy-state-topic",
+ )
+
+ harvester.producer = mocker.Mock()
+
+ # The file has not PMID, not importable.
+ with pytest.raises(ValueError):
+ harvester.fetch_date(datetime.datetime.strptime(test_date, '%Y-%m-%d'))
+