summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
blob: 007b20157d81669041604fec18f14becad2ed1a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

import json
import time
from pykafka.common import OffsetType

from .worker_common import FatcatWorker, most_recent_message


class ChangelogWorker(FatcatWorker):
    """
    Periodically polls the fatcat API looking for new changelogs. When they are
    found, fetch them and push (as JSON) into a Kafka topic.
    """

    def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
        # TODO: should be offset=0
        super().__init__(kafka_hosts=kafka_hosts,
                         produce_topic=produce_topic,
                         api=api)
        self.poll_interval = poll_interval
        self.offset = offset    # the fatcat changelog offset, not the kafka offset

    def run(self):
        topic = self.kafka.topics[self.produce_topic]
        # On start, try to consume the most recent from the topic, and using
        # that as the starting offset. Note that this is a single-partition
        # topic
        if self.offset is None:
            print("Checking for most recent changelog offset...")
            msg = most_recent_message(topic)
            if msg:
                self.offset = json.loads(msg.decode('utf-8'))['index']
            else:
                self.offset = 1

        with topic.get_producer(
                max_request_size=self.produce_max_request_size,
                ) as producer:
            while True:
                latest = int(self.api.get_changelog(limit=1)[0].index)
                if latest > self.offset:
                    print("Fetching changelogs from {} through {}".format(
                        self.offset+1, latest))
                for i in range(self.offset+1, latest+1):
                    cle = self.api.get_changelog_entry(i)
                    obj = self.api.api_client.sanitize_for_serialization(cle)
                    producer.produce(
                        message=json.dumps(obj).encode('utf-8'),
                        partition_key=None,
                        timestamp=None,
                        #NOTE could be (???): timestamp=cle.timestamp,
                    )
                    self.offset = i
                print("Sleeping {} seconds...".format(self.poll_interval))
                time.sleep(self.poll_interval)


class EntityUpdatesWorker(FatcatWorker):
    """
    Consumes from the changelog topic and publishes expanded entities (fetched
    from API) to update topics.

    For now, only release updates are published.
    """

    def __init__(self, api, kafka_hosts, consume_topic, release_topic):
        super().__init__(kafka_hosts=kafka_hosts,
                         consume_topic=consume_topic,
                         api=api)
        self.release_topic = release_topic
        self.consumer_group = "entity-updates"

    def run(self):
        changelog_topic = self.kafka.topics[self.consume_topic]
        release_topic = self.kafka.topics[self.release_topic]

        consumer = changelog_topic.get_balanced_consumer(
            consumer_group=self.consumer_group,
            managed=True,
            auto_offset_reset=OffsetType.LATEST,
            reset_offset_on_start=False,
            fetch_message_max_bytes=4000000, # up to ~4MBytes
            auto_commit_enable=True,
            auto_commit_interval_ms=30000, # 30 seconds
            compacted_topic=True,
        )

        # using a sync producer to try and avoid racey loss of delivery (aka,
        # if consumer group updated but produce didn't stick)
        with release_topic.get_sync_producer(
                max_request_size=self.produce_max_request_size,
                ) as producer:
            for msg in consumer:
                cle = json.loads(msg.value.decode('utf-8'))
                #print(cle)
                print("processing changelog index {}".format(cle['index']))
                release_ids = []
                file_ids = []
                container_ids = []
                release_edits = cle['editgroup']['edits']['releases']
                for re in release_edits:
                    release_ids.append(re['ident'])
                file_edits = cle['editgroup']['edits']['files']
                for e in file_edits:
                    file_ids.append(e['ident'])
                    # update release when a file changes
                    # TODO: fetch old revision as well, and only update
                    # releases for which list changed
                    release_ids.extend(e['release_ids'])
                container_edits = cle['editgroup']['edits']['containers']
                for e in container_edits:
                    container_ids.append(e['ident'])

                # TODO: do these fetches in parallel using a thread pool?
                for ident in set(release_ids):
                    release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
                    release_dict = self.api.api_client.sanitize_for_serialization(release)
                    producer.produce(
                        message=json.dumps(release_dict).encode('utf-8'),
                        partition_key=ident.encode('utf-8'),
                        timestamp=None,
                    )
                if False: # TODO: other topics, producers
                    for ident in set(file_ids):
                        file_entity = self.api.get_file(ident, expand=None)
                        file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
                        producer.produce(
                            message=json.dumps(file_dict).encode('utf-8'),
                            partition_key=ident.encode('utf-8'),
                            timestamp=None,
                        )
                    for ident in set(container_ids):
                        container = self.api.get_container(ident)
                        container_dict = self.api.api_client.sanitize_for_serialization(container)
                        producer.produce(
                            message=json.dumps(container_dict).encode('utf-8'),
                            partition_key=ident.encode('utf-8'),
                            timestamp=None,
                        )
                # TODO: track work_ids for all releases updated

                #consumer.commit_offsets()