diff options
Diffstat (limited to 'python/fatcat_tools/harvest/pubmed.py')
-rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 159 |
1 files changed, 106 insertions, 53 deletions
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index ee55f4eb..0f33f334 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -60,14 +60,15 @@ class PubmedFTPWorker: <tr> """ + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): - self.name = 'Pubmed' - self.host = 'ftp.ncbi.nlm.nih.gov' + self.name = "Pubmed" + self.host = "ftp.ncbi.nlm.nih.gov" self.produce_topic = produce_topic self.state_topic = state_topic self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes } self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks self.state = HarvestState(start_date, end_date) @@ -86,12 +87,14 @@ class PubmedFTPWorker: self._kafka_fail_fast = fail_fast producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) return Producer(producer_conf) def fetch_date(self, date): @@ -105,24 +108,35 @@ class PubmedFTPWorker: if self.date_file_map is None: raise ValueError("cannot fetch date without date file mapping") - date_str = date.strftime('%Y-%m-%d') + date_str = date.strftime("%Y-%m-%d") paths = self.date_file_map.get(date_str) if paths is None: - print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) + print( + "WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format( + date_str, self.date_file_map + ), + file=sys.stderr, + ) return False count = 0 for path in paths: # Fetch and decompress file. url = "ftp://{}{}".format(self.host, path) - filename = ftpretr(url, proxy_hostport="159.69.240.245:15201") # TODO: proxy obsolete, when networking issue is resolved - with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: + filename = ftpretr( + url, proxy_hostport="159.69.240.245:15201" + ) # TODO: proxy obsolete, when networking issue is resolved + with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as decomp: try: gzf = gzip.open(filename) shutil.copyfileobj(gzf, decomp) except zlib.error as exc: - print('[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)'.format( - url, exc), file=sys.stderr) + print( + "[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)".format( + url, exc + ), + file=sys.stderr, + ) continue # Here, blob is the unparsed XML; we peek into it to use PMID as @@ -131,15 +145,17 @@ class PubmedFTPWorker: # WARNING: Parsing foreign XML exposes us at some # https://docs.python.org/3/library/xml.html#xml-vulnerabilities # here. - for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): - soup = BeautifulSoup(blob, 'xml') - pmid = soup.find('PMID') + for blob in xmlstream(decomp.name, "PubmedArticle", encoding="utf-8"): + soup = BeautifulSoup(blob, "xml") + pmid = soup.find("PMID") if pmid is None: raise ValueError("no PMID found, please adjust identifier extraction") count += 1 if count % 50 == 0: print("... up to {}".format(count), file=sys.stderr) - self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + self.producer.produce( + self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast + ) self.producer.flush() os.remove(filename) @@ -151,13 +167,17 @@ class PubmedFTPWorker: while True: self.date_file_map = generate_date_file_map(host=self.host) if len(self.date_file_map) == 0: - raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + raise ValueError( + "map from dates to files should not be empty, maybe the HTML changed?" + ) current = self.state.next_span(continuous) if current: print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) + self.state.complete( + current, kafka_topic=self.state_topic, kafka_config=self.kafka_config + ) continue if continuous: @@ -168,7 +188,7 @@ class PubmedFTPWorker: print("{} FTP ingest caught up".format(self.name)) -def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): +def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"): """ Generate a DefaultDict[string, set] mapping dates to absolute filepaths on the server (mostly we have one file, but sometimes more). @@ -176,14 +196,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...} """ mapping = collections.defaultdict(set) - pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') + pattern = re.compile(r"Filename: ([^ ]*.xml) -- Created: ([^<]*)") ftp = ftplib.FTP(host) ftp.login() - filenames = ftp.nlst('/pubmed/updatefiles') + filenames = ftp.nlst("/pubmed/updatefiles") retries, retry_delay = 10, 60 for name in filenames: - if not name.endswith('.html'): + if not name.endswith(".html"): continue sio = io.StringIO() for i in range(retries): @@ -201,10 +221,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): ftp = ftplib.FTP(host) ftp.login() sio.truncate(0) - ftp.retrlines('RETR {}'.format(name), sio.write) + ftp.retrlines("RETR {}".format(name), sio.write) except (EOFError, ftplib.error_temp, socket.gaierror, BrokenPipeError) as exc: - print("ftp retr on {} failed with {} ({}) ({} retries left)".format( - name, exc, type(exc), retries - (i + 1)), file=sys.stderr) + print( + "ftp retr on {} failed with {} ({}) ({} retries left)".format( + name, exc, type(exc), retries - (i + 1) + ), + file=sys.stderr, + ) if i + 1 == retries: raise else: @@ -214,16 +238,24 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): contents = sio.getvalue() match = pattern.search(contents) if match is None: - print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) + print( + "pattern miss in {} on: {}, may need to adjust pattern: {}".format( + name, contents, pattern + ), + file=sys.stderr, + ) continue - filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') + ( + filename, + filedate, + ) = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') date = dateparser.parse(filedate) - fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) - date_str = date.strftime('%Y-%m-%d') + fullpath = "/pubmed/updatefiles/{}.gz".format(filename) + date_str = date.strftime("%Y-%m-%d") mapping[date_str].add(fullpath) - print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr) + print("added entry for {}: {}".format(date_str, fullpath), file=sys.stderr) - print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + print("generated date-file mapping for {} dates".format(len(mapping)), file=sys.stderr) return mapping @@ -241,20 +273,29 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): when we encountered EOFError while talking to the FTP server. Retry delay in seconds. """ if proxy_hostport is not None: - return ftpretr_via_http_proxy(url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay) + return ftpretr_via_http_proxy( + url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay + ) parsed = urlparse(url) server, path = parsed.netloc, parsed.path for i in range(max_retries): try: ftp = ftplib.FTP(server) ftp.login() - with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: - print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr) - ftp.retrbinary('RETR %s' % path, f.write) + with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f: + print( + "retrieving {} from {} to {} ...".format(path, server, f.name), + file=sys.stderr, + ) + ftp.retrbinary("RETR %s" % path, f.write) ftp.close() except EOFError as exc: - print("ftp retrbinary on {} failed with {} ({}) ({} retries left)".format( - path, exc, type(exc), max_retries - (i + 1)), file=sys.stderr) + print( + "ftp retrbinary on {} failed with {} ({}) ({} retries left)".format( + path, exc, type(exc), max_retries - (i + 1) + ), + file=sys.stderr, + ) if i + 1 == max_retries: raise else: @@ -263,7 +304,9 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): return f.name -def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1): +def ftpretr_via_http_proxy( + url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1 +): """ Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would be retrievable via proxy.com/a/b/c; (in 09/2021 we used @@ -276,19 +319,23 @@ def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retri try: url = "http://{}{}".format(proxy_hostport, path) print("retrieving file via proxy (ftpup) from {}".format(url), file=sys.stderr) - with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: + with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f: cmd = ["wget", "-c", url, "-O", f.name] result = subprocess.run(cmd) return f.name except (subprocess.CalledProcessError, OSError, ValueError) as exc: - print("ftp fetch {} failed with {} ({}) ({} retries left)".format( - url, exc, type(exc), max_retries - (i + 1)), file=sys.stderr) + print( + "ftp fetch {} failed with {} ({}) ({} retries left)".format( + url, exc, type(exc), max_retries - (i + 1) + ), + file=sys.stderr, + ) if i + 1 == max_retries: raise time.sleep(retry_delay) -def xmlstream(filename, tag, encoding='utf-8'): +def xmlstream(filename, tag, encoding="utf-8"): """ Note: This might move into a generic place in the future. @@ -300,23 +347,29 @@ def xmlstream(filename, tag, encoding='utf-8'): Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities """ + def strip_ns(tag): - if '}' not in tag: + if "}" not in tag: return tag - return tag.split('}')[1] + return tag.split("}")[1] # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm - context = iter(ET.iterparse(filename, events=( - 'start', - 'end', - ))) + context = iter( + ET.iterparse( + filename, + events=( + "start", + "end", + ), + ) + ) try: _, root = next(context) except StopIteration: return for event, elem in context: - if not strip_ns(elem.tag) == tag or event == 'start': + if not strip_ns(elem.tag) == tag or event == "start": continue yield ET.tostring(elem, encoding=encoding) |