diff options
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 33 |
1 files changed, 30 insertions, 3 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 547e270c..acb705c2 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): batch_size=200): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) - self.consumer_group = "elasticsearch-updates" + self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend @@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker): def run(self): ac = ApiClient() + def fail_fast(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + #print("Kafka consumer commit successful") + pass + def on_rebalance(consumer, partitions): for p in partitions: if p.error: @@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker): consumer_conf = self.kafka_config.copy() consumer_conf.update({ 'group.id': self.consumer_group, + 'on_commit': fail_fast, + 'delivery.report.only.error': True, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + 'enable.auto.commit': True, 'enable.auto.offset.store': False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + 'max.poll.interval.ms': 60000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, @@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker): print(desc) print(resp.content) raise Exception(desc) - consumer.store_offsets(batch[-1]) + for msg in batch: + # offsets are *committed* (to brokers) automatically, but need + # to be marked as processed here + consumer.store_offsets(message=msg) @@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): elasticsearch_index=elasticsearch_index, batch_size=batch_size) # previous group got corrupted (by pykafka library?) - self.consumer_group = "elasticsearch-updates2" + self.consumer_group = "elasticsearch-updates3" self.entity_type = ContainerEntity self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch |