diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
commit | 31d1a6a713d177990609767d508209ced19ca396 (patch) | |
tree | a628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/harvest/doi_registrars.py | |
parent | 9dc891b8098542bb089c8c47098b60a8beb76a53 (diff) | |
download | fatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz fatcat-31d1a6a713d177990609767d508209ced19ca396.zip |
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 145 |
1 files changed, 88 insertions, 57 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index d441d495..dd48e256 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -1,4 +1,3 @@ - import json import sys import time @@ -59,29 +58,35 @@ class HarvestCrossrefWorker: to be careful how state is serialized back into kafka. """ - def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, - api_host_url="https://api.crossref.org/works", start_date=None, - end_date=None): + def __init__( + self, + kafka_hosts, + produce_topic, + state_topic, + contact_email, + api_host_url="https://api.crossref.org/works", + start_date=None, + end_date=None, + ): self.api_host_url = api_host_url self.produce_topic = produce_topic self.state_topic = state_topic self.contact_email = contact_email self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes } self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 self.name = "Crossref" self.producer = self._kafka_producer() def _kafka_producer(self): - def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) @@ -92,46 +97,53 @@ class HarvestCrossrefWorker: self._kafka_fail_fast = fail_fast producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) return Producer(producer_conf) def params(self, date_str): - filter_param = 'from-update-date:{},until-update-date:{}'.format( - date_str, date_str) + filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str) return { - 'filter': filter_param, - 'rows': self.api_batch_size, - 'cursor': '*', + "filter": filter_param, + "rows": self.api_batch_size, + "cursor": "*", } def update_params(self, params, resp): - params['cursor'] = resp['message']['next-cursor'] + params["cursor"] = resp["message"]["next-cursor"] return params def extract_key(self, obj): - return obj['DOI'].encode('utf-8') + return obj["DOI"].encode("utf-8") def fetch_date(self, date): date_str = date.isoformat() params = self.params(date_str) http_session = requests_retry_session() - http_session.headers.update({ - 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format( - self.contact_email), - }) + http_session.headers.update( + { + "User-Agent": "fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests".format( + self.contact_email + ), + } + ) count = 0 while True: http_resp = http_session.get(self.api_host_url, params=params) if http_resp.status_code == 503: # crude backoff; now redundant with session exponential # backoff, but allows for longer backoff/downtime on remote end - print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr) + print( + "got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), + file=sys.stderr, + ) # keep kafka producer connection alive self.producer.poll(0) time.sleep(30.0) @@ -143,19 +155,27 @@ class HarvestCrossrefWorker: except json.JSONDecodeError as exc: # Datacite API returned HTTP 200, but JSON seemed unparseable. # It might be a glitch, so we retry. - print("failed to decode body from {}: {}".format(http_resp.url, resp_body), file=sys.stderr) + print( + "failed to decode body from {}: {}".format(http_resp.url, resp_body), + file=sys.stderr, + ) raise exc items = self.extract_items(resp) count += len(items) - print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, - self.extract_total(resp), http_resp.elapsed), file=sys.stderr) - #print(json.dumps(resp)) + print( + "... got {} ({} of {}), HTTP fetch took {}".format( + len(items), count, self.extract_total(resp), http_resp.elapsed + ), + file=sys.stderr, + ) + # print(json.dumps(resp)) for work in items: self.producer.produce( self.produce_topic, - json.dumps(work).encode('utf-8'), + json.dumps(work).encode("utf-8"), key=self.extract_key(work), - on_delivery=self._kafka_fail_fast) + on_delivery=self._kafka_fail_fast, + ) self.producer.poll(0) if len(items) < self.api_batch_size: break @@ -163,10 +183,10 @@ class HarvestCrossrefWorker: self.producer.flush() def extract_items(self, resp): - return resp['message']['items'] + return resp["message"]["items"] def extract_total(self, resp): - return resp['message']['total-results'] + return resp["message"]["total-results"] def run(self, continuous=False): @@ -175,9 +195,9 @@ class HarvestCrossrefWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, - kafka_topic=self.state_topic, - kafka_config=self.kafka_config) + self.state.complete( + current, kafka_topic=self.state_topic, kafka_config=self.kafka_config + ) continue if continuous: @@ -200,16 +220,25 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): could/should use this script for that, and dump to JSON? """ - def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, - api_host_url="https://api.datacite.org/dois", - start_date=None, end_date=None): - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - state_topic=state_topic, - api_host_url=api_host_url, - contact_email=contact_email, - start_date=start_date, - end_date=end_date) + def __init__( + self, + kafka_hosts, + produce_topic, + state_topic, + contact_email, + api_host_url="https://api.datacite.org/dois", + start_date=None, + end_date=None, + ): + super().__init__( + kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + state_topic=state_topic, + api_host_url=api_host_url, + contact_email=contact_email, + start_date=start_date, + end_date=end_date, + ) # for datecite, it's "from-update-date" self.name = "Datacite" @@ -219,19 +248,21 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): Dates have to be supplied in 2018-10-27T22:36:30.000Z format. """ return { - 'query': 'updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]'.format(date_str, date_str), - 'page[size]': self.api_batch_size, - 'page[cursor]': 1, + "query": "updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]".format( + date_str, date_str + ), + "page[size]": self.api_batch_size, + "page[cursor]": 1, } def extract_items(self, resp): - return resp['data'] + return resp["data"] def extract_total(self, resp): - return resp['meta']['total'] + return resp["meta"]["total"] def extract_key(self, obj): - return obj['attributes']['doi'].encode('utf-8') + return obj["attributes"]["doi"].encode("utf-8") def update_params(self, params, resp): """ @@ -245,9 +276,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): https://github.com/datacite/datacite/issues/897 (HTTP 400) https://github.com/datacite/datacite/issues/898 (HTTP 500) """ - parsed = urlparse(resp['links']['next']) - page_cursor = parse_qs(parsed.query).get('page[cursor]') + parsed = urlparse(resp["links"]["next"]) + page_cursor = parse_qs(parsed.query).get("page[cursor]") if not page_cursor: - raise ValueError('no page[cursor] in .links.next') - params['page[cursor]'] = page_cursor[0] + raise ValueError("no page[cursor] in .links.next") + params["page[cursor]"] = page_cursor[0] return params |