blob: cb4e5dabe27b5195aa59fe1235b1461c29e61fad (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import re
import sys
import csv
import json
import itertools
from itertools import islice
from pykafka import KafkaClient
from pykafka.common import OffsetType
import fatcat_client
from fatcat_client.rest import ApiException
def most_recent_message(topic):
"""
Tries to fetch the most recent message from a given topic.
This only makes sense for single partition topics, though could be
extended with "last N" behavior.
Following "Consuming the last N messages from a topic"
from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns
"""
consumer = topic.get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=True)
offsets = [(p, op.last_offset_consumed - 1)
for p, op in consumer._partitions.items()]
offsets = [(p, (o if o > -1 else -2)) for p, o in offsets]
if -2 in [o for p, o in offsets]:
consumer.stop()
return None
else:
consumer.reset_offsets(offsets)
msg = islice(consumer, 1)
if msg:
val = list(msg)[0].value
consumer.stop()
return val
else:
consumer.stop()
return None
class FatcatWorker:
"""
Common code for for Kafka producers and consumers.
"""
def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
if api:
self.api = api
self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
self.produce_topic = produce_topic
self.consume_topic = consume_topic
# Kafka producer batch size tuning; also limit on size of single document
self.produce_max_request_size = 10000000 # 10 MByte-ish
|