From 345b24d6a9efbffd0ff3fd3c65e22894b498a2c6 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 8 Apr 2019 21:11:25 -0700 Subject: convert pipeline workers from pykafka to confluent-kafka --- python/fatcat_tools/workers/worker_common.py | 77 +++++++++++++++++----------- 1 file changed, 48 insertions(+), 29 deletions(-) (limited to 'python/fatcat_tools/workers/worker_common.py') diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 9ffbe5fd..fb8cfc19 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -5,41 +5,56 @@ import csv import json import itertools from itertools import islice -from pykafka import KafkaClient -from pykafka.common import OffsetType +from confluent_kafka import Consumer, KafkaException, TopicPartition import fatcat_openapi_client from fatcat_openapi_client.rest import ApiException -def most_recent_message(topic): +def most_recent_message(topic, kafka_config): """ Tries to fetch the most recent message from a given topic. - This only makes sense for single partition topics, though could be - extended with "last N" behavior. - Following "Consuming the last N messages from a topic" - from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + This only makes sense for single partition topics (it works with only the + first partition), though could be extended with "last N" behavior. """ - consumer = topic.get_simple_consumer( - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=True) - offsets = [(p, op.last_offset_consumed - 1) - for p, op in consumer._partitions.items()] - offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] - if -2 in [o for p, o in offsets]: - consumer.stop() + + print("Fetching most Kafka message from {}".format(topic)) + + conf = kafka_config.copy() + conf.update({ + 'group.id': 'worker-init-last-msg', # should never commit + 'delivery.report.only.error': True, + 'enable.auto.commit': False, + 'default.topic.config': { + 'request.required.acks': -1, + 'auto.offset.reset': 'latest', + }, + }) + + consumer = Consumer(conf) + + hwm = consumer.get_watermark_offsets( + TopicPartition(topic, 0), + timeout=5.0, + cached=False) + if not hwm: + raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) + print("High watermarks: {}".format(hwm)) + + if hwm[1] == 0: + print("topic is new; not 'most recent message'") return None - else: - consumer.reset_offsets(offsets) - msg = islice(consumer, 1) - if msg: - val = list(msg)[0].value - consumer.stop() - return val - else: - consumer.stop() - return None + + consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) + msg = consumer.poll(2.0) + consumer.close() + if not msg: + raise Exception("Failed to fetch most recent kafka message") + if msg.error(): + raise KafkaException(msg.error()) + return msg.value() + class FatcatWorker: """ @@ -49,9 +64,13 @@ class FatcatWorker: def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None): if api: self.api = api - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + 'default.topic.config': { + 'request.required.acks': 'all', + }, + } self.produce_topic = produce_topic self.consume_topic = consume_topic - - # Kafka producer batch size tuning; also limit on size of single document - self.produce_max_request_size = 10000000 # 10 MByte-ish -- cgit v1.2.3