diff options
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 2df13283..13abb2e6 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -63,6 +63,27 @@ class HarvestCrossrefWorker: self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 self.name = "Crossref" + self.producer = self._kafka_producer() + + def _kafka_producer(self): + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + self._kafka_fail_fast = fail_fast + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + return Producer(producer_conf) def params(self, date_str): filter_param = 'from-index-date:{},until-index-date:{}'.format( @@ -82,22 +103,6 @@ class HarvestCrossrefWorker: def fetch_date(self, date): - def fail_fast(err, msg): - if err is not None: - print("Kafka producer delivery error: {}".format(err)) - print("Bailing out...") - # TODO: should it be sys.exit(-1)? - raise KafkaException(err) - - producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) - producer = Producer(producer_conf) - date_str = date.isoformat() params = self.params(date_str) http_session = requests_retry_session() @@ -113,7 +118,7 @@ class HarvestCrossrefWorker: # backoff, but allows for longer backoff/downtime on remote end print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) # keep kafka producer connection alive - producer.poll(0) + self.producer.poll(0) time.sleep(30.0) continue http_resp.raise_for_status() @@ -124,16 +129,16 @@ class HarvestCrossrefWorker: self.extract_total(resp), http_resp.elapsed)) #print(json.dumps(resp)) for work in items: - producer.produce( + self.producer.produce( self.produce_topic, json.dumps(work).encode('utf-8'), key=self.extract_key(work), - on_delivery=fail_fast) - producer.poll(0) + on_delivery=self._kafka_fail_fast) + self.producer.poll(0) if len(items) < self.api_batch_size: break params = self.update_params(params, resp) - producer.flush() + self.producer.flush() def extract_items(self, resp): return resp['message']['items'] |