summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index 2df13283..13abb2e6 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -63,6 +63,27 @@ class HarvestCrossrefWorker:
self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
self.api_batch_size = 50
self.name = "Crossref"
+ self.producer = self._kafka_producer()
+
+ def _kafka_producer(self):
+
+ def fail_fast(err, msg):
+ if err is not None:
+ print("Kafka producer delivery error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+
+ self._kafka_fail_fast = fail_fast
+
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ return Producer(producer_conf)
def params(self, date_str):
filter_param = 'from-index-date:{},until-index-date:{}'.format(
@@ -82,22 +103,6 @@ class HarvestCrossrefWorker:
def fetch_date(self, date):
- def fail_fast(err, msg):
- if err is not None:
- print("Kafka producer delivery error: {}".format(err))
- print("Bailing out...")
- # TODO: should it be sys.exit(-1)?
- raise KafkaException(err)
-
- producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
- producer = Producer(producer_conf)
-
date_str = date.isoformat()
params = self.params(date_str)
http_session = requests_retry_session()
@@ -113,7 +118,7 @@ class HarvestCrossrefWorker:
# backoff, but allows for longer backoff/downtime on remote end
print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code))
# keep kafka producer connection alive
- producer.poll(0)
+ self.producer.poll(0)
time.sleep(30.0)
continue
http_resp.raise_for_status()
@@ -124,16 +129,16 @@ class HarvestCrossrefWorker:
self.extract_total(resp), http_resp.elapsed))
#print(json.dumps(resp))
for work in items:
- producer.produce(
+ self.producer.produce(
self.produce_topic,
json.dumps(work).encode('utf-8'),
key=self.extract_key(work),
- on_delivery=fail_fast)
- producer.poll(0)
+ on_delivery=self._kafka_fail_fast)
+ self.producer.poll(0)
if len(items) < self.api_batch_size:
break
params = self.update_params(params, resp)
- producer.flush()
+ self.producer.flush()
def extract_items(self, resp):
return resp['message']['items']