diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-14 21:45:35 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-14 21:45:35 -0800 |
commit | 734cad6cf37f82ad5e1965003d34906608477b8f (patch) | |
tree | a5aea1909c5dad5218d8b37ad68a1b1328f3fdca /python/fatcat_tools/workers/worker_common.py | |
parent | 30bdb1b0ba28b2e4a81aa7209d294c224d8a2245 (diff) | |
download | fatcat-734cad6cf37f82ad5e1965003d34906608477b8f.tar.gz fatcat-734cad6cf37f82ad5e1965003d34906608477b8f.zip |
most_recent_message as reusable function
Diffstat (limited to 'python/fatcat_tools/workers/worker_common.py')
-rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 77ea2c15..66150644 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -9,6 +9,31 @@ from pykafka import KafkaClient from fatcat_client.rest import ApiException +def most_recent_message(topic): + """ + Tries to fetch the most recent message from a given topic. + This only makes sense for single partition topics, though could be + extended with "last N" behavior. + + Following "Consuming the last N messages from a topic" + from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + """ + consumer = topic.get_simple_consumer( + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=True) + offsets = [(p, op.last_offset_consumed - 1) + for p, op in consumer._partitions.items()] + offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] + if -2 in [o for p, o in offsets]: + return None + else: + consumer.reset_offsets(offsets) + msg = islice(consumer, 1) + if msg: + return list(msg)[0].value + else: + return None + class FatcatWorker: """ Common code for for Kafka producers and consumers. |