diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-06 21:15:48 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-20 11:21:10 -0700 | 
| commit | 4a861749ee23a14974ca9222baa2f9b7d47c38d9 (patch) | |
| tree | 58423031085c96515128bcae6cea31e64b5990ad /python/fatcat_tools/harvest/oaipmh.py | |
| parent | 2b8d4778ca4566f9437ac0da8ca4564d28b57aca (diff) | |
| download | fatcat-4a861749ee23a14974ca9222baa2f9b7d47c38d9.tar.gz fatcat-4a861749ee23a14974ca9222baa2f9b7d47c38d9.zip | |
first draft harvesters using confluent-kafka
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 41 | 
1 files changed, 30 insertions, 11 deletions
| diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 0b482924..ab424482 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -7,8 +7,8 @@ import time  import itertools  import datetime  import requests -from pykafka import KafkaClient  import sickle +from confluent_kafka import Producer  from fatcat_tools.workers import most_recent_message  from .harvest_common import HarvestState @@ -37,7 +37,12 @@ class HarvestOaiPmhWorker:          self.produce_topic = produce_topic          self.state_topic = state_topic -        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'delivery.report.only.error': True, +            'default.topic.config': +                {'request.required.acks': 'all'}, +        }          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks @@ -45,14 +50,21 @@ class HarvestOaiPmhWorker:          self.metadata_prefix = None  # needs override          self.name = "unnamed"          self.state = HarvestState(start_date, end_date) -        self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) +        self.state.initialize_from_kafka(self.state_topic, self.kafka_config) +    def kafka_produce_delivery_callback(err, msg): +        if err is not None: +            print("Kafka producer delivery error: {}".format(err)) +            print("Bailing out...") +            # TODO: should it be sys.exit(-1)? +            raise KafkaException(err)      def fetch_date(self, date): +        producer = Producer(self.kafka_config) +          api = sickle.Sickle(self.endpoint_url)          date_str = date.isoformat() -        produce_topic = self.kafka.topics[self.produce_topic]          # this dict kwargs hack is to work around 'from' as a reserved python keyword          # recommended by sickle docs          try: @@ -66,12 +78,17 @@ class HarvestOaiPmhWorker:              return          count = 0 -        with produce_topic.get_producer() as producer: -            for item in records: -                count += 1 -                if count % 50 == 0: -                    print("... up to {}".format(count)) -                producer.produce(item.raw.encode('utf-8'), partition_key=item.header.identifier.encode('utf-8')) +        for item in records: +            count += 1 +            if count % 50 == 0: +                print("... up to {}".format(count)) +            producer.produce( +                self.produce_topic, +                item.raw.encode('utf-8'), +                key=item.header.identifier.encode('utf-8'), +                on_delivery=self.kafka_produce_delivery_callback) +            producer.poll(0) +        producer.flush()      def run(self, continuous=False): @@ -80,7 +97,9 @@ class HarvestOaiPmhWorker:              if current:                  print("Fetching DOIs updated on {} (UTC)".format(current))                  self.fetch_date(current) -                self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) +                self.state.complete(current, +                    kafka_topic=self.state_topic, +                    kafka_config=self.kafka_config)                  continue              if continuous: | 
