diff options
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 34 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 14 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 33 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 6 | 
6 files changed, 67 insertions, 24 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 133a3f01..3362df35 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -7,7 +7,7 @@ import time  import itertools  import datetime  import requests -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException  from fatcat_tools.workers import most_recent_message  from .harvest_common import HarvestState, requests_retry_session diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index a0c3c2cf..3e3bea03 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -8,7 +8,7 @@ import itertools  import datetime  import requests  import sickle -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException  from fatcat_tools.workers import most_recent_message  from .harvest_common import HarvestState diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 32bb210a..42fe38aa 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -703,14 +703,23 @@ class KafkaJsonPusher(RecordPusher):              topic_suffix,              group,          ) +        self.poll_interval = kwargs.get('poll_interval', 5.0) +        self.consume_batch_size = kwargs.get('consume_batch_size', 100)      def run(self):          count = 0          while True: +            # TODO: this is batch-oriented, because underlying importer is +            # often batch-oriented, but this doesn't confirm that entire batch +            # has been pushed to fatcat before commiting offset. Eg, consider +            # case where there there is one update and thousands of creates; +            # update would be lingering in importer, and if importer crashed +            # never created. Not great.              batch = self.consumer.consume( -                num_messages=self.edit_batch_size, -                timeout=3.0) -            print("... got {} kafka messages".format(len(batch))) +                num_messages=self.consume_batch_size, +                timeout=self.poll_interval) +            print("... got {} kafka messages ({}sec poll interval)".format( +                len(batch), self.poll_interval))              if not batch:                  # TODO: could have some larger timeout here and                  # self.importer.finish() if it's been more than, eg, a couple @@ -727,10 +736,11 @@ class KafkaJsonPusher(RecordPusher):                  count += 1                  if count % 500 == 0:                      print("Import counts: {}".format(self.importer.counts)) -            # locally store the last processed message; will be auto-commited -            # from this "stored" value -            assert msg -            self.consumer.store_offsets(msg) +            for msg in batch: +                # locally store offsets of processed messages; will be +                # auto-commited by librdkafka from this "stored" value +                self.consumer.store_offsets(message=msg) +          # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or          # commit the current batch if it has been lingering          counts = self.importer.finish() @@ -750,7 +760,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):              raise KafkaException(err)          for p in partitions:              # check for partition-specific commit errors -            print(p)              if p.error:                  print("Kafka consumer commit error: {}".format(p.error))                  print("Bailing out...") @@ -764,12 +773,17 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):      #auto_commit_interval_ms=30000, # 30 seconds      conf = {          'bootstrap.servers': hosts, -        'group.id': group.encode('utf-8'), +        'group.id': group,          'on_commit': fail_fast,          'delivery.report.only.error': True, +        # messages don't have offset marked as stored until pushed to +        # elastic, but we do auto-commit stored offsets to broker          'enable.auto.offset.store': False, +        'enable.auto.commit': True, +        # user code timeout; if no poll after this long, assume user code +        # hung and rebalance (default: 5min) +        'max.poll.interval.ms': 120000,          'default.topic.config': { -            'request.required.acks': -1,              'auto.offset.reset': 'latest',          },      } diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 4a54c649..c134bde2 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -13,7 +13,6 @@ class ChangelogWorker(FatcatWorker):      """      def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): -        # TODO: should be offset=0          super().__init__(kafka_hosts=kafka_hosts,                           produce_topic=produce_topic,                           api=api) @@ -118,7 +117,15 @@ class EntityUpdatesWorker(FatcatWorker):          consumer_conf = self.kafka_config.copy()          consumer_conf.update({              'group.id': self.consumer_group, +            'on_commit': fail_fast, +            'delivery.report.only.error': True, +            # messages don't have offset marked as stored until pushed to +            # elastic, but we do auto-commit stored offsets to broker +            'enable.auto.commit': True,              'enable.auto.offset.store': False, +            # user code timeout; if no poll after this long, assume user code +            # hung and rebalance (default: 5min) +            'max.poll.interval.ms': 180000,              'default.topic.config': {                  'auto.offset.reset': 'latest',              }, @@ -142,8 +149,7 @@ class EntityUpdatesWorker(FatcatWorker):          while True:              msg = consumer.poll(self.poll_interval)              if not msg: -                print("nothing new from kafka (interval:{})".format(self.poll_interval)) -                consumer.commit() +                print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval))                  continue              if msg.error():                  raise KafkaException(msg.error()) @@ -202,5 +208,5 @@ class EntityUpdatesWorker(FatcatWorker):                      on_delivery=fail_fast,                  )              # TODO: actually update works -            consumer.store_offsets(msg) +            consumer.store_offsets(message=msg) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 547e270c..acb705c2 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):              batch_size=200):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic) -        self.consumer_group = "elasticsearch-updates" +        self.consumer_group = "elasticsearch-updates3"          self.batch_size = batch_size          self.poll_interval = poll_interval          self.elasticsearch_backend = elasticsearch_backend @@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker):      def run(self):          ac = ApiClient() +        def fail_fast(err, partitions): +            if err is not None: +                print("Kafka consumer commit error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) +            for p in partitions: +                # check for partition-specific commit errors +                if p.error: +                    print("Kafka consumer commit error: {}".format(p.error)) +                    print("Bailing out...") +                    # TODO: should it be sys.exit(-1)? +                    raise KafkaException(err) +            #print("Kafka consumer commit successful") +            pass +          def on_rebalance(consumer, partitions):              for p in partitions:                  if p.error: @@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):          consumer_conf = self.kafka_config.copy()          consumer_conf.update({              'group.id': self.consumer_group, +            'on_commit': fail_fast, +            'delivery.report.only.error': True, +            # messages don't have offset marked as stored until pushed to +            # elastic, but we do auto-commit stored offsets to broker +            'enable.auto.commit': True,              'enable.auto.offset.store': False, +            # user code timeout; if no poll after this long, assume user code +            # hung and rebalance (default: 5min) +            'max.poll.interval.ms': 60000,              'default.topic.config': {                  'auto.offset.reset': 'latest',              }, @@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):                  print(desc)                  print(resp.content)                  raise Exception(desc) -            consumer.store_offsets(batch[-1]) +            for msg in batch: +                # offsets are *committed* (to brokers) automatically, but need +                # to be marked as processed here +                consumer.store_offsets(message=msg) @@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):                           elasticsearch_index=elasticsearch_index,                           batch_size=batch_size)          # previous group got corrupted (by pykafka library?) -        self.consumer_group = "elasticsearch-updates2" +        self.consumer_group = "elasticsearch-updates3"          self.entity_type = ContainerEntity          self.elasticsearch_document_name = "container"          self.transform_func = container_to_elasticsearch diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index fb8cfc19..ef79f528 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -66,11 +66,7 @@ class FatcatWorker:              self.api = api          self.kafka_config = {              'bootstrap.servers': kafka_hosts, -            'delivery.report.only.error': True, -            'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes -            'default.topic.config': { -                'request.required.acks': 'all', -            }, +            'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes          }          self.produce_topic = produce_topic          self.consume_topic = consume_topic  | 
