From 734cad6cf37f82ad5e1965003d34906608477b8f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 14 Nov 2018 21:45:35 -0800 Subject: most_recent_message as reusable function --- python/fatcat_tools/workers/changelog.py | 27 +-------------------------- python/fatcat_tools/workers/worker_common.py | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 6319da2f..ccfbcf19 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -2,7 +2,7 @@ import json import time from itertools import islice -from fatcat_tools.workers.worker_common import FatcatWorker +from fatcat_tools.workers.worker_common import FatcatWorker, most_recent_message from pykafka.common import OffsetType @@ -20,31 +20,6 @@ class FatcatChangelogWorker(FatcatWorker): self.poll_interval = poll_interval self.offset = offset # the fatcat changelog offset, not the kafka offset - def most_recent_message(self, topic): - """ - Tries to fetch the most recent message from a given topic. - This only makes sense for single partition topics, though could be - extended with "last N" behavior. - - Following "Consuming the last N messages from a topic" - from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns - """ - consumer = topic.get_simple_consumer( - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=True) - offsets = [(p, op.last_offset_consumed - 1) - for p, op in consumer._partitions.items()] - offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] - if -2 in [o for p, o in offsets]: - return None - else: - consumer.reset_offsets(offsets) - msg = islice(consumer, 1) - if msg: - return list(msg)[0].value - else: - return None - def run(self): topic = self.kafka.topics[self.produce_topic] # On start, try to consume the most recent from the topic, and using diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 77ea2c15..66150644 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -9,6 +9,31 @@ from pykafka import KafkaClient from fatcat_client.rest import ApiException +def most_recent_message(topic): + """ + Tries to fetch the most recent message from a given topic. + This only makes sense for single partition topics, though could be + extended with "last N" behavior. + + Following "Consuming the last N messages from a topic" + from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + """ + consumer = topic.get_simple_consumer( + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=True) + offsets = [(p, op.last_offset_consumed - 1) + for p, op in consumer._partitions.items()] + offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] + if -2 in [o for p, o in offsets]: + return None + else: + consumer.reset_offsets(offsets) + msg = islice(consumer, 1) + if msg: + return list(msg)[0].value + else: + return None + class FatcatWorker: """ Common code for for Kafka producers and consumers. -- cgit v1.2.3