diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-12-06 16:57:55 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-12-06 16:58:00 -0800 | 
| commit | 87bd176bc669bfb1830e4c29683ee9a6c0dd2861 (patch) | |
| tree | 404c4aafa2ef4de396990addcadd5b04db216878 /python/fatcat_tools/harvest | |
| parent | fb5d2a2dad5f6bb3beedc999a52be2d0ed1b4f75 (diff) | |
| download | fatcat-87bd176bc669bfb1830e4c29683ee9a6c0dd2861.tar.gz fatcat-87bd176bc669bfb1830e4c29683ee9a6c0dd2861.zip | |
refactor kafka producer in crossref harvester
producer creation/configuration should be happening in __init__() time,
not 'daily' call.
This specific refactor motivated by mocking out the producer in unit
tests.
Diffstat (limited to 'python/fatcat_tools/harvest')
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 47 | 
1 files changed, 26 insertions, 21 deletions
| diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 2df13283..13abb2e6 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -63,6 +63,27 @@ class HarvestCrossrefWorker:          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks          self.api_batch_size = 50          self.name = "Crossref" +        self.producer = self._kafka_producer() + +    def _kafka_producer(self): + +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        self._kafka_fail_fast = fail_fast + +        producer_conf = self.kafka_config.copy() +        producer_conf.update({ +            'delivery.report.only.error': True, +            'default.topic.config': { +                'request.required.acks': -1, # all brokers must confirm +            }, +        }) +        return Producer(producer_conf)      def params(self, date_str):          filter_param = 'from-index-date:{},until-index-date:{}'.format( @@ -82,22 +103,6 @@ class HarvestCrossrefWorker:      def fetch_date(self, date): -        def fail_fast(err, msg): -            if err is not None: -                print("Kafka producer delivery error: {}".format(err)) -                print("Bailing out...") -                # TODO: should it be sys.exit(-1)? -                raise KafkaException(err) - -        producer_conf = self.kafka_config.copy() -        producer_conf.update({ -            'delivery.report.only.error': True, -            'default.topic.config': { -                'request.required.acks': -1, # all brokers must confirm -            }, -        }) -        producer = Producer(producer_conf) -          date_str = date.isoformat()          params = self.params(date_str)          http_session = requests_retry_session() @@ -113,7 +118,7 @@ class HarvestCrossrefWorker:                  # backoff, but allows for longer backoff/downtime on remote end                  print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code))                  # keep kafka producer connection alive -                producer.poll(0) +                self.producer.poll(0)                  time.sleep(30.0)                  continue              http_resp.raise_for_status() @@ -124,16 +129,16 @@ class HarvestCrossrefWorker:                  self.extract_total(resp), http_resp.elapsed))              #print(json.dumps(resp))              for work in items: -                producer.produce( +                self.producer.produce(                      self.produce_topic,                      json.dumps(work).encode('utf-8'),                      key=self.extract_key(work), -                    on_delivery=fail_fast) -            producer.poll(0) +                    on_delivery=self._kafka_fail_fast) +            self.producer.poll(0)              if len(items) < self.api_batch_size:                  break              params = self.update_params(params, resp) -        producer.flush() +        self.producer.flush()      def extract_items(self, resp):          return resp['message']['items'] | 
