diff options
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 802e0e22..1483266c 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -7,7 +7,7 @@ import time import itertools import datetime import requests -from pykafka import KafkaClient +from confluent_kafka import Producer from fatcat_tools.workers import most_recent_message from .harvest_common import HarvestState, requests_retry_session @@ -53,11 +53,16 @@ class HarvestCrossrefWorker: self.produce_topic = produce_topic self.state_topic = state_topic self.contact_email = contact_email - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") self.is_update_filter = is_update_filter + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'default.topic.config': + {'request.required.acks': 'all'}, + } self.state = HarvestState(start_date, end_date) - self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) + self.state.initialize_from_kafka(self.state_topic, self.kafka_config) self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 @@ -81,9 +86,16 @@ class HarvestCrossrefWorker: def extract_key(self, obj): return obj['DOI'].encode('utf-8') + def kafka_produce_delivery_callback(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + def fetch_date(self, date): - produce_topic = self.kafka.topics[self.produce_topic] + producer = Producer(self.kafka_config) date_str = date.isoformat() params = self.params(date_str) @@ -93,27 +105,32 @@ class HarvestCrossrefWorker: self.contact_email), }) count = 0 - with produce_topic.get_producer() as producer: - while True: - http_resp = http_session.get(self.api_host_url, params=params) - if http_resp.status_code == 503: - # crude backoff; now redundant with session exponential - # backoff, but allows for longer backoff/downtime on remote end - print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) - time.sleep(30.0) - continue - http_resp.raise_for_status() - resp = http_resp.json() - items = self.extract_items(resp) - count += len(items) - print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, - self.extract_total(resp), http_resp.elapsed)) - #print(json.dumps(resp)) - for work in items: - producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work)) - if len(items) < self.api_batch_size: - break - params = self.update_params(params, resp) + while True: + http_resp = http_session.get(self.api_host_url, params=params) + if http_resp.status_code == 503: + # crude backoff; now redundant with session exponential + # backoff, but allows for longer backoff/downtime on remote end + print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) + time.sleep(30.0) + continue + http_resp.raise_for_status() + resp = http_resp.json() + items = self.extract_items(resp) + count += len(items) + print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, + self.extract_total(resp), http_resp.elapsed)) + #print(json.dumps(resp)) + for work in items: + producer.produce( + self.produce_topic, + json.dumps(work).encode('utf-8'), + key=self.extract_key(work), + on_delivery=self.kafka_produce_delivery_callback) + producer.poll(0) + if len(items) < self.api_batch_size: + break + params = self.update_params(params, resp) + producer.flush() def extract_items(self, resp): return resp['message']['items'] @@ -128,7 +145,9 @@ class HarvestCrossrefWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current)) self.fetch_date(current) - self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + self.state.complete(current, + kafka_topic=self.state_topic, + kafka_config=self.kafka_config) continue if continuous: |