diff options
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 41 |
1 files changed, 30 insertions, 11 deletions
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 0b482924..ab424482 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -7,8 +7,8 @@ import time import itertools import datetime import requests -from pykafka import KafkaClient import sickle +from confluent_kafka import Producer from fatcat_tools.workers import most_recent_message from .harvest_common import HarvestState @@ -37,7 +37,12 @@ class HarvestOaiPmhWorker: self.produce_topic = produce_topic self.state_topic = state_topic - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'default.topic.config': + {'request.required.acks': 'all'}, + } self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks @@ -45,14 +50,21 @@ class HarvestOaiPmhWorker: self.metadata_prefix = None # needs override self.name = "unnamed" self.state = HarvestState(start_date, end_date) - self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) + self.state.initialize_from_kafka(self.state_topic, self.kafka_config) + def kafka_produce_delivery_callback(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) def fetch_date(self, date): + producer = Producer(self.kafka_config) + api = sickle.Sickle(self.endpoint_url) date_str = date.isoformat() - produce_topic = self.kafka.topics[self.produce_topic] # this dict kwargs hack is to work around 'from' as a reserved python keyword # recommended by sickle docs try: @@ -66,12 +78,17 @@ class HarvestOaiPmhWorker: return count = 0 - with produce_topic.get_producer() as producer: - for item in records: - count += 1 - if count % 50 == 0: - print("... up to {}".format(count)) - producer.produce(item.raw.encode('utf-8'), partition_key=item.header.identifier.encode('utf-8')) + for item in records: + count += 1 + if count % 50 == 0: + print("... up to {}".format(count)) + producer.produce( + self.produce_topic, + item.raw.encode('utf-8'), + key=item.header.identifier.encode('utf-8'), + on_delivery=self.kafka_produce_delivery_callback) + producer.poll(0) + producer.flush() def run(self, continuous=False): @@ -80,7 +97,9 @@ class HarvestOaiPmhWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current)) self.fetch_date(current) - self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + self.state.complete(current, + kafka_topic=self.state_topic, + kafka_config=self.kafka_config) continue if continuous: |