blob: 36dafade21239e9fe8f917d18467b28e290a3a1c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import requests
from fatcat_web import Config
def kafka_pixy_produce(topic, msg, key=None, sync=True, timeout=25):
"""
Simple helper to public a message to the given Kafka topic, via the
configured kafka-pixy HTTP gateway
topic: string
msg: string
key: optional, bytes
timeout: seconds
"""
if not Config.KAFKA_PIXY_ENDPOINT:
raise Exception("Kafka produce error: kafka-pixy endpoint not configured")
params = dict()
if key:
params["key"] = key
if sync:
params["sync"] = True
resp = requests.post(
"{}/topics/{}/messages".format(Config.KAFKA_PIXY_ENDPOINT, topic),
params=params,
data=msg,
headers={"Content-Type": "text/plain"},
timeout=timeout,
)
resp.raise_for_status()
# print(resp.json())
|