diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 34 | 
1 files changed, 24 insertions, 10 deletions
| diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 32bb210a..42fe38aa 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -703,14 +703,23 @@ class KafkaJsonPusher(RecordPusher):              topic_suffix,              group,          ) +        self.poll_interval = kwargs.get('poll_interval', 5.0) +        self.consume_batch_size = kwargs.get('consume_batch_size', 100)      def run(self):          count = 0          while True: +            # TODO: this is batch-oriented, because underlying importer is +            # often batch-oriented, but this doesn't confirm that entire batch +            # has been pushed to fatcat before commiting offset. Eg, consider +            # case where there there is one update and thousands of creates; +            # update would be lingering in importer, and if importer crashed +            # never created. Not great.              batch = self.consumer.consume( -                num_messages=self.edit_batch_size, -                timeout=3.0) -            print("... got {} kafka messages".format(len(batch))) +                num_messages=self.consume_batch_size, +                timeout=self.poll_interval) +            print("... got {} kafka messages ({}sec poll interval)".format( +                len(batch), self.poll_interval))              if not batch:                  # TODO: could have some larger timeout here and                  # self.importer.finish() if it's been more than, eg, a couple @@ -727,10 +736,11 @@ class KafkaJsonPusher(RecordPusher):                  count += 1                  if count % 500 == 0:                      print("Import counts: {}".format(self.importer.counts)) -            # locally store the last processed message; will be auto-commited -            # from this "stored" value -            assert msg -            self.consumer.store_offsets(msg) +            for msg in batch: +                # locally store offsets of processed messages; will be +                # auto-commited by librdkafka from this "stored" value +                self.consumer.store_offsets(message=msg) +          # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or          # commit the current batch if it has been lingering          counts = self.importer.finish() @@ -750,7 +760,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):              raise KafkaException(err)          for p in partitions:              # check for partition-specific commit errors -            print(p)              if p.error:                  print("Kafka consumer commit error: {}".format(p.error))                  print("Bailing out...") @@ -764,12 +773,17 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):      #auto_commit_interval_ms=30000, # 30 seconds      conf = {          'bootstrap.servers': hosts, -        'group.id': group.encode('utf-8'), +        'group.id': group,          'on_commit': fail_fast,          'delivery.report.only.error': True, +        # messages don't have offset marked as stored until pushed to +        # elastic, but we do auto-commit stored offsets to broker          'enable.auto.offset.store': False, +        'enable.auto.commit': True, +        # user code timeout; if no poll after this long, assume user code +        # hung and rebalance (default: 5min) +        'max.poll.interval.ms': 120000,          'default.topic.config': { -            'request.required.acks': -1,              'auto.offset.reset': 'latest',          },      } | 
