aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/kafka.py
blob: fe9f36e90d8f80942e1c85e4ecdae183b95bea0d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from typing import Any, Optional

from confluent_kafka import KafkaException, Producer


def kafka_fail_fast(err: Optional[Any], _msg: Any) -> None:
    if err is not None:
        print("Kafka producer delivery error: {}".format(err))
        print("Bailing out...")
        # TODO: should it be sys.exit(-1)?
        raise KafkaException(err)


def simple_kafka_producer(kafka_hosts: str) -> Producer:
    """
    kafka_hosts should be a string with hostnames separated by ',', not a list
    of hostnames
    """

    kafka_config = {
        "bootstrap.servers": kafka_hosts,
        "message.max.bytes": 20000000,  # ~20 MBytes; broker-side max is ~50 MBytes
        "delivery.report.only.error": True,
        "default.topic.config": {
            "request.required.acks": -1,
        },
    }
    return Producer(kafka_config)