aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/doi_registrars.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py145
1 files changed, 88 insertions, 57 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index d441d495..dd48e256 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -1,4 +1,3 @@
-
import json
import sys
import time
@@ -59,29 +58,35 @@ class HarvestCrossrefWorker:
to be careful how state is serialized back into kafka.
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
- api_host_url="https://api.crossref.org/works", start_date=None,
- end_date=None):
+ def __init__(
+ self,
+ kafka_hosts,
+ produce_topic,
+ state_topic,
+ contact_email,
+ api_host_url="https://api.crossref.org/works",
+ start_date=None,
+ end_date=None,
+ ):
self.api_host_url = api_host_url
self.produce_topic = produce_topic
self.state_topic = state_topic
self.contact_email = contact_email
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes
}
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
+ self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
self.api_batch_size = 50
self.name = "Crossref"
self.producer = self._kafka_producer()
def _kafka_producer(self):
-
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
@@ -92,46 +97,53 @@ class HarvestCrossrefWorker:
self._kafka_fail_fast = fail_fast
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
return Producer(producer_conf)
def params(self, date_str):
- filter_param = 'from-update-date:{},until-update-date:{}'.format(
- date_str, date_str)
+ filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str)
return {
- 'filter': filter_param,
- 'rows': self.api_batch_size,
- 'cursor': '*',
+ "filter": filter_param,
+ "rows": self.api_batch_size,
+ "cursor": "*",
}
def update_params(self, params, resp):
- params['cursor'] = resp['message']['next-cursor']
+ params["cursor"] = resp["message"]["next-cursor"]
return params
def extract_key(self, obj):
- return obj['DOI'].encode('utf-8')
+ return obj["DOI"].encode("utf-8")
def fetch_date(self, date):
date_str = date.isoformat()
params = self.params(date_str)
http_session = requests_retry_session()
- http_session.headers.update({
- 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(
- self.contact_email),
- })
+ http_session.headers.update(
+ {
+ "User-Agent": "fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests".format(
+ self.contact_email
+ ),
+ }
+ )
count = 0
while True:
http_resp = http_session.get(self.api_host_url, params=params)
if http_resp.status_code == 503:
# crude backoff; now redundant with session exponential
# backoff, but allows for longer backoff/downtime on remote end
- print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr)
+ print(
+ "got HTTP {}, pausing for 30 seconds".format(http_resp.status_code),
+ file=sys.stderr,
+ )
# keep kafka producer connection alive
self.producer.poll(0)
time.sleep(30.0)
@@ -143,19 +155,27 @@ class HarvestCrossrefWorker:
except json.JSONDecodeError as exc:
# Datacite API returned HTTP 200, but JSON seemed unparseable.
# It might be a glitch, so we retry.
- print("failed to decode body from {}: {}".format(http_resp.url, resp_body), file=sys.stderr)
+ print(
+ "failed to decode body from {}: {}".format(http_resp.url, resp_body),
+ file=sys.stderr,
+ )
raise exc
items = self.extract_items(resp)
count += len(items)
- print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,
- self.extract_total(resp), http_resp.elapsed), file=sys.stderr)
- #print(json.dumps(resp))
+ print(
+ "... got {} ({} of {}), HTTP fetch took {}".format(
+ len(items), count, self.extract_total(resp), http_resp.elapsed
+ ),
+ file=sys.stderr,
+ )
+ # print(json.dumps(resp))
for work in items:
self.producer.produce(
self.produce_topic,
- json.dumps(work).encode('utf-8'),
+ json.dumps(work).encode("utf-8"),
key=self.extract_key(work),
- on_delivery=self._kafka_fail_fast)
+ on_delivery=self._kafka_fail_fast,
+ )
self.producer.poll(0)
if len(items) < self.api_batch_size:
break
@@ -163,10 +183,10 @@ class HarvestCrossrefWorker:
self.producer.flush()
def extract_items(self, resp):
- return resp['message']['items']
+ return resp["message"]["items"]
def extract_total(self, resp):
- return resp['message']['total-results']
+ return resp["message"]["total-results"]
def run(self, continuous=False):
@@ -175,9 +195,9 @@ class HarvestCrossrefWorker:
if current:
print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current,
- kafka_topic=self.state_topic,
- kafka_config=self.kafka_config)
+ self.state.complete(
+ current, kafka_topic=self.state_topic, kafka_config=self.kafka_config
+ )
continue
if continuous:
@@ -200,16 +220,25 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
could/should use this script for that, and dump to JSON?
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
- api_host_url="https://api.datacite.org/dois",
- start_date=None, end_date=None):
- super().__init__(kafka_hosts=kafka_hosts,
- produce_topic=produce_topic,
- state_topic=state_topic,
- api_host_url=api_host_url,
- contact_email=contact_email,
- start_date=start_date,
- end_date=end_date)
+ def __init__(
+ self,
+ kafka_hosts,
+ produce_topic,
+ state_topic,
+ contact_email,
+ api_host_url="https://api.datacite.org/dois",
+ start_date=None,
+ end_date=None,
+ ):
+ super().__init__(
+ kafka_hosts=kafka_hosts,
+ produce_topic=produce_topic,
+ state_topic=state_topic,
+ api_host_url=api_host_url,
+ contact_email=contact_email,
+ start_date=start_date,
+ end_date=end_date,
+ )
# for datecite, it's "from-update-date"
self.name = "Datacite"
@@ -219,19 +248,21 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
Dates have to be supplied in 2018-10-27T22:36:30.000Z format.
"""
return {
- 'query': 'updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]'.format(date_str, date_str),
- 'page[size]': self.api_batch_size,
- 'page[cursor]': 1,
+ "query": "updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]".format(
+ date_str, date_str
+ ),
+ "page[size]": self.api_batch_size,
+ "page[cursor]": 1,
}
def extract_items(self, resp):
- return resp['data']
+ return resp["data"]
def extract_total(self, resp):
- return resp['meta']['total']
+ return resp["meta"]["total"]
def extract_key(self, obj):
- return obj['attributes']['doi'].encode('utf-8')
+ return obj["attributes"]["doi"].encode("utf-8")
def update_params(self, params, resp):
"""
@@ -245,9 +276,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
https://github.com/datacite/datacite/issues/897 (HTTP 400)
https://github.com/datacite/datacite/issues/898 (HTTP 500)
"""
- parsed = urlparse(resp['links']['next'])
- page_cursor = parse_qs(parsed.query).get('page[cursor]')
+ parsed = urlparse(resp["links"]["next"])
+ page_cursor = parse_qs(parsed.query).get("page[cursor]")
if not page_cursor:
- raise ValueError('no page[cursor] in .links.next')
- params['page[cursor]'] = page_cursor[0]
+ raise ValueError("no page[cursor] in .links.next")
+ params["page[cursor]"] = page_cursor[0]
return params