aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/worker_common.py
blob: ef79f528970e7318556112562c8f626028080abe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

import re
import sys
import csv
import json
import itertools
from itertools import islice
from confluent_kafka import Consumer, KafkaException, TopicPartition

import fatcat_openapi_client
from fatcat_openapi_client.rest import ApiException


def most_recent_message(topic, kafka_config):
    """
    Tries to fetch the most recent message from a given topic.

    This only makes sense for single partition topics (it works with only the
    first partition), though could be extended with "last N" behavior.
    """

    print("Fetching most Kafka message from {}".format(topic))

    conf = kafka_config.copy()
    conf.update({
        'group.id': 'worker-init-last-msg', # should never commit
        'delivery.report.only.error': True,
        'enable.auto.commit': False,
        'default.topic.config': {
            'request.required.acks': -1,
            'auto.offset.reset': 'latest',
        },
    })

    consumer = Consumer(conf)

    hwm = consumer.get_watermark_offsets(
        TopicPartition(topic, 0),
        timeout=5.0,
        cached=False)
    if not hwm:
        raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic))
    print("High watermarks: {}".format(hwm))

    if hwm[1] == 0:
        print("topic is new; not 'most recent message'")
        return None

    consumer.assign([TopicPartition(topic, 0, hwm[1]-1)])
    msg = consumer.poll(2.0)
    consumer.close()
    if not msg:
        raise Exception("Failed to fetch most recent kafka message")
    if msg.error():
        raise KafkaException(msg.error())
    return msg.value()


class FatcatWorker:
    """
    Common code for for Kafka producers and consumers.
    """

    def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
        if api:
            self.api = api
        self.kafka_config = {
            'bootstrap.servers': kafka_hosts,
            'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
        }
        self.produce_topic = produce_topic
        self.consume_topic = consume_topic