diff options
Diffstat (limited to 'python/fatcat_tools/harvest')
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 47 | 
1 files changed, 26 insertions, 21 deletions
| diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 2df13283..13abb2e6 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -63,6 +63,27 @@ class HarvestCrossrefWorker:          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks          self.api_batch_size = 50          self.name = "Crossref" +        self.producer = self._kafka_producer() + +    def _kafka_producer(self): + +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        self._kafka_fail_fast = fail_fast + +        producer_conf = self.kafka_config.copy() +        producer_conf.update({ +            'delivery.report.only.error': True, +            'default.topic.config': { +                'request.required.acks': -1, # all brokers must confirm +            }, +        }) +        return Producer(producer_conf)      def params(self, date_str):          filter_param = 'from-index-date:{},until-index-date:{}'.format( @@ -82,22 +103,6 @@ class HarvestCrossrefWorker:      def fetch_date(self, date): -        def fail_fast(err, msg): -            if err is not None: -                print("Kafka producer delivery error: {}".format(err)) -                print("Bailing out...") -                # TODO: should it be sys.exit(-1)? -                raise KafkaException(err) - -        producer_conf = self.kafka_config.copy() -        producer_conf.update({ -            'delivery.report.only.error': True, -            'default.topic.config': { -                'request.required.acks': -1, # all brokers must confirm -            }, -        }) -        producer = Producer(producer_conf) -          date_str = date.isoformat()          params = self.params(date_str)          http_session = requests_retry_session() @@ -113,7 +118,7 @@ class HarvestCrossrefWorker:                  # backoff, but allows for longer backoff/downtime on remote end                  print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code))                  # keep kafka producer connection alive -                producer.poll(0) +                self.producer.poll(0)                  time.sleep(30.0)                  continue              http_resp.raise_for_status() @@ -124,16 +129,16 @@ class HarvestCrossrefWorker:                  self.extract_total(resp), http_resp.elapsed))              #print(json.dumps(resp))              for work in items: -                producer.produce( +                self.producer.produce(                      self.produce_topic,                      json.dumps(work).encode('utf-8'),                      key=self.extract_key(work), -                    on_delivery=fail_fast) -            producer.poll(0) +                    on_delivery=self._kafka_fail_fast) +            self.producer.poll(0)              if len(items) < self.api_batch_size:                  break              params = self.update_params(params, resp) -        producer.flush() +        self.producer.flush()      def extract_items(self, resp):          return resp['message']['items'] | 
