blob: 53b62a3785d7816d3e71937dd7abc97630a6e090 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from confluent_kafka import Consumer, Producer, KafkaException
def kafka_fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err))
print("Bailing out...")
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
def simple_kafka_producer(kafka_hosts):
kafka_config = {
'bootstrap.servers': kafka_hosts,
'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
'delivery.report.only.error': True,
'default.topic.config': {
'request.required.acks': -1,
},
}
return Producer(kafka_config)
|