aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
blob: 6319da2fd022b28ce51e65b9424c26237ea9e603 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

import json
import time
from itertools import islice
from fatcat_tools.workers.worker_common import FatcatWorker
from pykafka.common import OffsetType


class FatcatChangelogWorker(FatcatWorker):
    """
    Periodically polls the fatcat API looking for new changelogs. When they are
    found, fetch them and push (as JSON) into a Kafka topic.
    """

    def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
        # TODO: should be offset=0
        super().__init__(kafka_hosts=kafka_hosts,
                         produce_topic=produce_topic,
                         api_host_url=api_host_url)
        self.poll_interval = poll_interval
        self.offset = offset    # the fatcat changelog offset, not the kafka offset

    def most_recent_message(self, topic):
        """
        Tries to fetch the most recent message from a given topic.
        This only makes sense for single partition topics, though could be
        extended with "last N" behavior.

        Following "Consuming the last N messages from a topic"
        from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns
        """
        consumer = topic.get_simple_consumer(
            auto_offset_reset=OffsetType.LATEST,
            reset_offset_on_start=True)
        offsets = [(p, op.last_offset_consumed - 1)
                    for p, op in consumer._partitions.items()]
        offsets = [(p, (o if o > -1 else -2)) for p, o in offsets]
        if -2 in [o for p, o in offsets]:
            return None
        else:
            consumer.reset_offsets(offsets)
            msg = islice(consumer, 1)
            if msg:
                return list(msg)[0].value
            else:
                return None

    def run(self):
        topic = self.kafka.topics[self.produce_topic]
        # On start, try to consume the most recent from the topic, and using
        # that as the starting offset. Note that this is a single-partition
        # topic
        if self.offset is None:
            print("Checking for most recent changelog offset...")
            msg = self.most_recent_message(topic)
            if msg:
                self.offset = json.loads(msg.decode('utf-8'))['index']
            else:
                self.offset = 1

        with topic.get_sync_producer() as producer:
            while True: 
                latest = int(self.api.get_changelog(limit=1)[0].index)
                if latest > self.offset:
                    print("Fetching changelogs from {} through {}".format(
                        self.offset+1, latest))
                for i in range(self.offset+1, latest+1):
                    cle = self.api.get_changelog_entry(i)
                    obj = self.api.api_client.sanitize_for_serialization(cle)
                    producer.produce(
                        message=json.dumps(obj).encode('utf-8'),
                        partition_key=None,
                        timestamp=None,
                        #XXX: timestamp=cle.timestamp,
                    )
                    self.offset = i
                print("Sleeping {} seconds...".format(self.poll_interval))
                time.sleep(self.poll_interval)


class FatcatEntityUpdatesWorker(FatcatWorker):
    """
    Consumes from the changelog topic and publishes expanded entities (fetched
    from API) to update topics.

    For now, only release updates are published.
    """

    def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic):
        super().__init__(kafka_hosts=kafka_hosts,
                         consume_topic=consume_topic,
                         api_host_url=api_host_url)
        self.release_topic = release_topic
        self.consumer_group = "entity-updates"

    def run(self):
        changelog_topic = self.kafka.topics[self.consume_topic]
        release_topic = self.kafka.topics[self.release_topic]

        consumer = changelog_topic.get_balanced_consumer(
            consumer_group=self.consumer_group,
            managed=True,
            auto_offset_reset=OffsetType.LATEST,
            reset_offset_on_start=False,
            fetch_message_max_bytes=4000000, # up to ~4MBytes
            auto_commit_enable=True,
            auto_commit_interval_ms=30000, # 30 seconds
            compacted_topic=True,
        )

        with release_topic.get_sync_producer() as producer:
            for msg in consumer:
                cle = json.loads(msg.value.decode('utf-8'))
                #print(cle)
                print("processing changelog index {}".format(cle['index']))
                release_edits = cle['editgroup']['edits']['releases']
                for re in release_edits:
                    ident = re['ident']
                    release = self.api.get_release(ident, expand="files,container")
                    release_dict = self.api.api_client.sanitize_for_serialization(release)
                    producer.produce(
                        message=json.dumps(release_dict).encode('utf-8'),
                        partition_key=ident.encode('utf-8'),
                        timestamp=None,
                    )
                #consumer.commit_offsets()