diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-14 21:45:35 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-14 21:45:35 -0800 | 
| commit | 734cad6cf37f82ad5e1965003d34906608477b8f (patch) | |
| tree | a5aea1909c5dad5218d8b37ad68a1b1328f3fdca | |
| parent | 30bdb1b0ba28b2e4a81aa7209d294c224d8a2245 (diff) | |
| download | fatcat-734cad6cf37f82ad5e1965003d34906608477b8f.tar.gz fatcat-734cad6cf37f82ad5e1965003d34906608477b8f.zip | |
most_recent_message as reusable function
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 27 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 25 | 
2 files changed, 26 insertions, 26 deletions
| diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 6319da2f..ccfbcf19 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -2,7 +2,7 @@  import json  import time  from itertools import islice -from fatcat_tools.workers.worker_common import FatcatWorker +from fatcat_tools.workers.worker_common import FatcatWorker, most_recent_message  from pykafka.common import OffsetType @@ -20,31 +20,6 @@ class FatcatChangelogWorker(FatcatWorker):          self.poll_interval = poll_interval          self.offset = offset    # the fatcat changelog offset, not the kafka offset -    def most_recent_message(self, topic): -        """ -        Tries to fetch the most recent message from a given topic. -        This only makes sense for single partition topics, though could be -        extended with "last N" behavior. - -        Following "Consuming the last N messages from a topic" -        from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns -        """ -        consumer = topic.get_simple_consumer( -            auto_offset_reset=OffsetType.LATEST, -            reset_offset_on_start=True) -        offsets = [(p, op.last_offset_consumed - 1) -                    for p, op in consumer._partitions.items()] -        offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] -        if -2 in [o for p, o in offsets]: -            return None -        else: -            consumer.reset_offsets(offsets) -            msg = islice(consumer, 1) -            if msg: -                return list(msg)[0].value -            else: -                return None -      def run(self):          topic = self.kafka.topics[self.produce_topic]          # On start, try to consume the most recent from the topic, and using diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 77ea2c15..66150644 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -9,6 +9,31 @@ from pykafka import KafkaClient  from fatcat_client.rest import ApiException +def most_recent_message(topic): +    """ +    Tries to fetch the most recent message from a given topic. +    This only makes sense for single partition topics, though could be +    extended with "last N" behavior. + +    Following "Consuming the last N messages from a topic" +    from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns +    """ +    consumer = topic.get_simple_consumer( +        auto_offset_reset=OffsetType.LATEST, +        reset_offset_on_start=True) +    offsets = [(p, op.last_offset_consumed - 1) +                for p, op in consumer._partitions.items()] +    offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] +    if -2 in [o for p, o in offsets]: +        return None +    else: +        consumer.reset_offsets(offsets) +        msg = islice(consumer, 1) +        if msg: +            return list(msg)[0].value +        else: +            return None +  class FatcatWorker:      """      Common code for for Kafka producers and consumers. | 
