summaryrefslogtreecommitdiffstats
path: root/python/fatcat_web/kafka.py
blob: a05b97e03377e392a39102d97c0d316503de5c43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from typing import Any, Dict, Optional

import requests

from fatcat_web import Config


def kafka_pixy_produce(
    topic: str, msg: str, key: Optional[bytes] = None, sync: bool = True, timeout: float = 25
) -> None:
    """
    Simple helper to public a message to the given Kafka topic, via the
    configured kafka-pixy HTTP gateway

    topic: string
    msg: string
    key: optional, bytes
    timeout: seconds
    """

    if not Config.KAFKA_PIXY_ENDPOINT:
        raise Exception("Kafka produce error: kafka-pixy endpoint not configured")

    params: Dict[str, Any] = dict()
    if key:
        params["key"] = key
    if sync:
        params["sync"] = True
    resp = requests.post(
        "{}/topics/{}/messages".format(Config.KAFKA_PIXY_ENDPOINT, topic),
        params=params,
        data=msg,
        headers={"Content-Type": "text/plain"},
        timeout=timeout,
    )
    resp.raise_for_status()
    # print(resp.json())