diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
commit | 31d1a6a713d177990609767d508209ced19ca396 (patch) | |
tree | a628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/harvest | |
parent | 9dc891b8098542bb089c8c47098b60a8beb76a53 (diff) | |
download | fatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz fatcat-31d1a6a713d177990609767d508209ced19ca396.zip |
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/harvest')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 145 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 83 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 57 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/pubmed.py | 159 |
4 files changed, 271 insertions, 173 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index d441d495..dd48e256 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -1,4 +1,3 @@ - import json import sys import time @@ -59,29 +58,35 @@ class HarvestCrossrefWorker: to be careful how state is serialized back into kafka. """ - def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, - api_host_url="https://api.crossref.org/works", start_date=None, - end_date=None): + def __init__( + self, + kafka_hosts, + produce_topic, + state_topic, + contact_email, + api_host_url="https://api.crossref.org/works", + start_date=None, + end_date=None, + ): self.api_host_url = api_host_url self.produce_topic = produce_topic self.state_topic = state_topic self.contact_email = contact_email self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes } self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 self.name = "Crossref" self.producer = self._kafka_producer() def _kafka_producer(self): - def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) @@ -92,46 +97,53 @@ class HarvestCrossrefWorker: self._kafka_fail_fast = fail_fast producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) return Producer(producer_conf) def params(self, date_str): - filter_param = 'from-update-date:{},until-update-date:{}'.format( - date_str, date_str) + filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str) return { - 'filter': filter_param, - 'rows': self.api_batch_size, - 'cursor': '*', + "filter": filter_param, + "rows": self.api_batch_size, + "cursor": "*", } def update_params(self, params, resp): - params['cursor'] = resp['message']['next-cursor'] + params["cursor"] = resp["message"]["next-cursor"] return params def extract_key(self, obj): - return obj['DOI'].encode('utf-8') + return obj["DOI"].encode("utf-8") def fetch_date(self, date): date_str = date.isoformat() params = self.params(date_str) http_session = requests_retry_session() - http_session.headers.update({ - 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format( - self.contact_email), - }) + http_session.headers.update( + { + "User-Agent": "fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests".format( + self.contact_email + ), + } + ) count = 0 while True: http_resp = http_session.get(self.api_host_url, params=params) if http_resp.status_code == 503: # crude backoff; now redundant with session exponential # backoff, but allows for longer backoff/downtime on remote end - print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr) + print( + "got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), + file=sys.stderr, + ) # keep kafka producer connection alive self.producer.poll(0) time.sleep(30.0) @@ -143,19 +155,27 @@ class HarvestCrossrefWorker: except json.JSONDecodeError as exc: # Datacite API returned HTTP 200, but JSON seemed unparseable. # It might be a glitch, so we retry. - print("failed to decode body from {}: {}".format(http_resp.url, resp_body), file=sys.stderr) + print( + "failed to decode body from {}: {}".format(http_resp.url, resp_body), + file=sys.stderr, + ) raise exc items = self.extract_items(resp) count += len(items) - print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, - self.extract_total(resp), http_resp.elapsed), file=sys.stderr) - #print(json.dumps(resp)) + print( + "... got {} ({} of {}), HTTP fetch took {}".format( + len(items), count, self.extract_total(resp), http_resp.elapsed + ), + file=sys.stderr, + ) + # print(json.dumps(resp)) for work in items: self.producer.produce( self.produce_topic, - json.dumps(work).encode('utf-8'), + json.dumps(work).encode("utf-8"), key=self.extract_key(work), - on_delivery=self._kafka_fail_fast) + on_delivery=self._kafka_fail_fast, + ) self.producer.poll(0) if len(items) < self.api_batch_size: break @@ -163,10 +183,10 @@ class HarvestCrossrefWorker: self.producer.flush() def extract_items(self, resp): - return resp['message']['items'] + return resp["message"]["items"] def extract_total(self, resp): - return resp['message']['total-results'] + return resp["message"]["total-results"] def run(self, continuous=False): @@ -175,9 +195,9 @@ class HarvestCrossrefWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, - kafka_topic=self.state_topic, - kafka_config=self.kafka_config) + self.state.complete( + current, kafka_topic=self.state_topic, kafka_config=self.kafka_config + ) continue if continuous: @@ -200,16 +220,25 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): could/should use this script for that, and dump to JSON? """ - def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, - api_host_url="https://api.datacite.org/dois", - start_date=None, end_date=None): - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - state_topic=state_topic, - api_host_url=api_host_url, - contact_email=contact_email, - start_date=start_date, - end_date=end_date) + def __init__( + self, + kafka_hosts, + produce_topic, + state_topic, + contact_email, + api_host_url="https://api.datacite.org/dois", + start_date=None, + end_date=None, + ): + super().__init__( + kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + state_topic=state_topic, + api_host_url=api_host_url, + contact_email=contact_email, + start_date=start_date, + end_date=end_date, + ) # for datecite, it's "from-update-date" self.name = "Datacite" @@ -219,19 +248,21 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): Dates have to be supplied in 2018-10-27T22:36:30.000Z format. """ return { - 'query': 'updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]'.format(date_str, date_str), - 'page[size]': self.api_batch_size, - 'page[cursor]': 1, + "query": "updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]".format( + date_str, date_str + ), + "page[size]": self.api_batch_size, + "page[cursor]": 1, } def extract_items(self, resp): - return resp['data'] + return resp["data"] def extract_total(self, resp): - return resp['meta']['total'] + return resp["meta"]["total"] def extract_key(self, obj): - return obj['attributes']['doi'].encode('utf-8') + return obj["attributes"]["doi"].encode("utf-8") def update_params(self, params, resp): """ @@ -245,9 +276,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): https://github.com/datacite/datacite/issues/897 (HTTP 400) https://github.com/datacite/datacite/issues/898 (HTTP 500) """ - parsed = urlparse(resp['links']['next']) - page_cursor = parse_qs(parsed.query).get('page[cursor]') + parsed = urlparse(resp["links"]["next"]) + page_cursor = parse_qs(parsed.query).get("page[cursor]") if not page_cursor: - raise ValueError('no page[cursor] in .links.next') - params['page[cursor]'] = page_cursor[0] + raise ValueError("no page[cursor] in .links.next") + params["page[cursor]"] = page_cursor[0] return params diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index 45c2b8ea..fda0dc62 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -1,4 +1,3 @@ - import datetime import json import sys @@ -14,8 +13,10 @@ from requests.packages.urllib3.util.retry import Retry # pylint: disable=import # Used for parsing ISO date format (YYYY-MM-DD) DATE_FMT = "%Y-%m-%d" -def requests_retry_session(retries=10, backoff_factor=3, - status_forcelist=(500, 502, 504), session=None): + +def requests_retry_session( + retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None +): """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -28,10 +29,11 @@ def requests_retry_session(retries=10, backoff_factor=3, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) - session.mount('http://', adapter) - session.mount('https://', adapter) + session.mount("http://", adapter) + session.mount("https://", adapter) return session + class HarvestState: """ First version of this works with full days (dates) @@ -57,8 +59,9 @@ class HarvestState: self.enqueue_period(start_date, end_date, catchup_days) def __str__(self): - return '<HarvestState to_process={}, completed={}>'.format( - len(self.to_process), len(self.completed)) + return "<HarvestState to_process={}, completed={}>".format( + len(self.to_process), len(self.completed) + ) def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): """ @@ -92,7 +95,9 @@ class HarvestState: """ if continuous: # enqueue yesterday - self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + self.enqueue_period( + start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1) + ) if not self.to_process: return None return sorted(list(self.to_process))[0] @@ -105,8 +110,8 @@ class HarvestState: state stored on disk or in Kafka. """ state = json.loads(state_json) - if 'completed-date' in state: - date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() + if "completed-date" in state: + date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date() self.complete(date) def complete(self, date, kafka_topic=None, kafka_config=None): @@ -123,12 +128,14 @@ class HarvestState: except KeyError: pass self.completed.add(date) - state_json = json.dumps({ - 'in-progress-dates': [str(d) for d in self.to_process], - 'completed-date': str(date), - }).encode('utf-8') + state_json = json.dumps( + { + "in-progress-dates": [str(d) for d in self.to_process], + "completed-date": str(date), + } + ).encode("utf-8") if kafka_topic: - assert(kafka_config) + assert kafka_config def fail_fast(err, msg): if err: @@ -136,17 +143,16 @@ class HarvestState: print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr) producer_conf = kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) - producer.produce( - kafka_topic, - state_json, - on_delivery=fail_fast) + producer.produce(kafka_topic, state_json, on_delivery=fail_fast) producer.flush() return state_json @@ -166,22 +172,25 @@ class HarvestState: raise KafkaException(err) conf = kafka_config.copy() - conf.update({ - 'group.id': 'dummy_init_group', # should never be committed - 'enable.auto.commit': False, - 'auto.offset.reset': 'earliest', - 'session.timeout.ms': 10000, - }) + conf.update( + { + "group.id": "dummy_init_group", # should never be committed + "enable.auto.commit": False, + "auto.offset.reset": "earliest", + "session.timeout.ms": 10000, + } + ) consumer = Consumer(conf) # this watermark fetch is mostly to ensure we are connected to broker and # fail fast if not, but we also confirm that we read to end below. hwm = consumer.get_watermark_offsets( - TopicPartition(kafka_topic, 0), - timeout=5.0, - cached=False) + TopicPartition(kafka_topic, 0), timeout=5.0, cached=False + ) if not hwm: - raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)) + raise Exception( + "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic) + ) consumer.assign([TopicPartition(kafka_topic, 0, 0)]) c = 0 @@ -191,8 +200,8 @@ class HarvestState: break if msg.error(): raise KafkaException(msg.error()) - #sys.stdout.write('.') - self.update(msg.value().decode('utf-8')) + # sys.stdout.write('.') + self.update(msg.value().decode("utf-8")) c += 1 consumer.close() diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 0eb0343d..40d1c853 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -1,4 +1,3 @@ - import sys import time @@ -25,19 +24,18 @@ class HarvestOaiPmhWorker: would want something similar operationally. Oh well! """ - def __init__(self, kafka_hosts, produce_topic, state_topic, - start_date=None, end_date=None): + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): self.produce_topic = produce_topic self.state_topic = state_topic self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes } - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks - self.endpoint_url = None # needs override + self.endpoint_url = None # needs override self.metadata_prefix = None # needs override self.name = "unnamed" self.state = HarvestState(start_date, end_date) @@ -45,7 +43,6 @@ class HarvestOaiPmhWorker: print(self.state, file=sys.stderr) def fetch_date(self, date): - def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) @@ -54,12 +51,14 @@ class HarvestOaiPmhWorker: raise KafkaException(err) producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) api = sickle.Sickle(self.endpoint_url, max_retries=5, retry_status_codes=[503]) @@ -67,13 +66,18 @@ class HarvestOaiPmhWorker: # this dict kwargs hack is to work around 'from' as a reserved python keyword # recommended by sickle docs try: - records = api.ListRecords(**{ - 'metadataPrefix': self.metadata_prefix, - 'from': date_str, - 'until': date_str, - }) + records = api.ListRecords( + **{ + "metadataPrefix": self.metadata_prefix, + "from": date_str, + "until": date_str, + } + ) except sickle.oaiexceptions.NoRecordsMatch: - print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr) + print( + "WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), + file=sys.stderr, + ) return count = 0 @@ -83,9 +87,10 @@ class HarvestOaiPmhWorker: print("... up to {}".format(count), file=sys.stderr) producer.produce( self.produce_topic, - item.raw.encode('utf-8'), - key=item.header.identifier.encode('utf-8'), - on_delivery=fail_fast) + item.raw.encode("utf-8"), + key=item.header.identifier.encode("utf-8"), + on_delivery=fail_fast, + ) producer.flush() def run(self, continuous=False): @@ -95,9 +100,9 @@ class HarvestOaiPmhWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, - kafka_topic=self.state_topic, - kafka_config=self.kafka_config) + self.state.complete( + current, kafka_topic=self.state_topic, kafka_config=self.kafka_config + ) continue if continuous: diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py index ee55f4eb..0f33f334 100644 --- a/python/fatcat_tools/harvest/pubmed.py +++ b/python/fatcat_tools/harvest/pubmed.py @@ -60,14 +60,15 @@ class PubmedFTPWorker: <tr> """ + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): - self.name = 'Pubmed' - self.host = 'ftp.ncbi.nlm.nih.gov' + self.name = "Pubmed" + self.host = "ftp.ncbi.nlm.nih.gov" self.produce_topic = produce_topic self.state_topic = state_topic self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes } self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks self.state = HarvestState(start_date, end_date) @@ -86,12 +87,14 @@ class PubmedFTPWorker: self._kafka_fail_fast = fail_fast producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) return Producer(producer_conf) def fetch_date(self, date): @@ -105,24 +108,35 @@ class PubmedFTPWorker: if self.date_file_map is None: raise ValueError("cannot fetch date without date file mapping") - date_str = date.strftime('%Y-%m-%d') + date_str = date.strftime("%Y-%m-%d") paths = self.date_file_map.get(date_str) if paths is None: - print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr) + print( + "WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format( + date_str, self.date_file_map + ), + file=sys.stderr, + ) return False count = 0 for path in paths: # Fetch and decompress file. url = "ftp://{}{}".format(self.host, path) - filename = ftpretr(url, proxy_hostport="159.69.240.245:15201") # TODO: proxy obsolete, when networking issue is resolved - with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp: + filename = ftpretr( + url, proxy_hostport="159.69.240.245:15201" + ) # TODO: proxy obsolete, when networking issue is resolved + with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as decomp: try: gzf = gzip.open(filename) shutil.copyfileobj(gzf, decomp) except zlib.error as exc: - print('[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)'.format( - url, exc), file=sys.stderr) + print( + "[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)".format( + url, exc + ), + file=sys.stderr, + ) continue # Here, blob is the unparsed XML; we peek into it to use PMID as @@ -131,15 +145,17 @@ class PubmedFTPWorker: # WARNING: Parsing foreign XML exposes us at some # https://docs.python.org/3/library/xml.html#xml-vulnerabilities # here. - for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'): - soup = BeautifulSoup(blob, 'xml') - pmid = soup.find('PMID') + for blob in xmlstream(decomp.name, "PubmedArticle", encoding="utf-8"): + soup = BeautifulSoup(blob, "xml") + pmid = soup.find("PMID") if pmid is None: raise ValueError("no PMID found, please adjust identifier extraction") count += 1 if count % 50 == 0: print("... up to {}".format(count), file=sys.stderr) - self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast) + self.producer.produce( + self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast + ) self.producer.flush() os.remove(filename) @@ -151,13 +167,17 @@ class PubmedFTPWorker: while True: self.date_file_map = generate_date_file_map(host=self.host) if len(self.date_file_map) == 0: - raise ValueError("map from dates to files should not be empty, maybe the HTML changed?") + raise ValueError( + "map from dates to files should not be empty, maybe the HTML changed?" + ) current = self.state.next_span(continuous) if current: print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config) + self.state.complete( + current, kafka_topic=self.state_topic, kafka_config=self.kafka_config + ) continue if continuous: @@ -168,7 +188,7 @@ class PubmedFTPWorker: print("{} FTP ingest caught up".format(self.name)) -def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): +def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"): """ Generate a DefaultDict[string, set] mapping dates to absolute filepaths on the server (mostly we have one file, but sometimes more). @@ -176,14 +196,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...} """ mapping = collections.defaultdict(set) - pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)') + pattern = re.compile(r"Filename: ([^ ]*.xml) -- Created: ([^<]*)") ftp = ftplib.FTP(host) ftp.login() - filenames = ftp.nlst('/pubmed/updatefiles') + filenames = ftp.nlst("/pubmed/updatefiles") retries, retry_delay = 10, 60 for name in filenames: - if not name.endswith('.html'): + if not name.endswith(".html"): continue sio = io.StringIO() for i in range(retries): @@ -201,10 +221,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): ftp = ftplib.FTP(host) ftp.login() sio.truncate(0) - ftp.retrlines('RETR {}'.format(name), sio.write) + ftp.retrlines("RETR {}".format(name), sio.write) except (EOFError, ftplib.error_temp, socket.gaierror, BrokenPipeError) as exc: - print("ftp retr on {} failed with {} ({}) ({} retries left)".format( - name, exc, type(exc), retries - (i + 1)), file=sys.stderr) + print( + "ftp retr on {} failed with {} ({}) ({} retries left)".format( + name, exc, type(exc), retries - (i + 1) + ), + file=sys.stderr, + ) if i + 1 == retries: raise else: @@ -214,16 +238,24 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'): contents = sio.getvalue() match = pattern.search(contents) if match is None: - print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr) + print( + "pattern miss in {} on: {}, may need to adjust pattern: {}".format( + name, contents, pattern + ), + file=sys.stderr, + ) continue - filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') + ( + filename, + filedate, + ) = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019') date = dateparser.parse(filedate) - fullpath = '/pubmed/updatefiles/{}.gz'.format(filename) - date_str = date.strftime('%Y-%m-%d') + fullpath = "/pubmed/updatefiles/{}.gz".format(filename) + date_str = date.strftime("%Y-%m-%d") mapping[date_str].add(fullpath) - print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr) + print("added entry for {}: {}".format(date_str, fullpath), file=sys.stderr) - print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr) + print("generated date-file mapping for {} dates".format(len(mapping)), file=sys.stderr) return mapping @@ -241,20 +273,29 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): when we encountered EOFError while talking to the FTP server. Retry delay in seconds. """ if proxy_hostport is not None: - return ftpretr_via_http_proxy(url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay) + return ftpretr_via_http_proxy( + url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay + ) parsed = urlparse(url) server, path = parsed.netloc, parsed.path for i in range(max_retries): try: ftp = ftplib.FTP(server) ftp.login() - with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: - print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr) - ftp.retrbinary('RETR %s' % path, f.write) + with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f: + print( + "retrieving {} from {} to {} ...".format(path, server, f.name), + file=sys.stderr, + ) + ftp.retrbinary("RETR %s" % path, f.write) ftp.close() except EOFError as exc: - print("ftp retrbinary on {} failed with {} ({}) ({} retries left)".format( - path, exc, type(exc), max_retries - (i + 1)), file=sys.stderr) + print( + "ftp retrbinary on {} failed with {} ({}) ({} retries left)".format( + path, exc, type(exc), max_retries - (i + 1) + ), + file=sys.stderr, + ) if i + 1 == max_retries: raise else: @@ -263,7 +304,9 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None): return f.name -def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1): +def ftpretr_via_http_proxy( + url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1 +): """ Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would be retrievable via proxy.com/a/b/c; (in 09/2021 we used @@ -276,19 +319,23 @@ def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retri try: url = "http://{}{}".format(proxy_hostport, path) print("retrieving file via proxy (ftpup) from {}".format(url), file=sys.stderr) - with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f: + with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f: cmd = ["wget", "-c", url, "-O", f.name] result = subprocess.run(cmd) return f.name except (subprocess.CalledProcessError, OSError, ValueError) as exc: - print("ftp fetch {} failed with {} ({}) ({} retries left)".format( - url, exc, type(exc), max_retries - (i + 1)), file=sys.stderr) + print( + "ftp fetch {} failed with {} ({}) ({} retries left)".format( + url, exc, type(exc), max_retries - (i + 1) + ), + file=sys.stderr, + ) if i + 1 == max_retries: raise time.sleep(retry_delay) -def xmlstream(filename, tag, encoding='utf-8'): +def xmlstream(filename, tag, encoding="utf-8"): """ Note: This might move into a generic place in the future. @@ -300,23 +347,29 @@ def xmlstream(filename, tag, encoding='utf-8'): Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities """ + def strip_ns(tag): - if '}' not in tag: + if "}" not in tag: return tag - return tag.split('}')[1] + return tag.split("}")[1] # https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm - context = iter(ET.iterparse(filename, events=( - 'start', - 'end', - ))) + context = iter( + ET.iterparse( + filename, + events=( + "start", + "end", + ), + ) + ) try: _, root = next(context) except StopIteration: return for event, elem in context: - if not strip_ns(elem.tag) == tag or event == 'start': + if not strip_ns(elem.tag) == tag or event == "start": continue yield ET.tostring(elem, encoding=encoding) |