summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
blob: 789a97a4dfacc60556d50c6fe2301e72d0ac8051 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157

import json
import time
from pykafka.common import OffsetType

from .worker_common import FatcatWorker, most_recent_message


class ChangelogWorker(FatcatWorker):
    """
    Periodically polls the fatcat API looking for new changelogs. When they are
    found, fetch them and push (as JSON) into a Kafka topic.
    """

    def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
        # TODO: should be offset=0
        super().__init__(kafka_hosts=kafka_hosts,
                         produce_topic=produce_topic,
                         api=api)
        self.poll_interval = poll_interval
        self.offset = offset    # the fatcat changelog offset, not the kafka offset

    def run(self):
        topic = self.kafka.topics[self.produce_topic]
        # On start, try to consume the most recent from the topic, and using
        # that as the starting offset. Note that this is a single-partition
        # topic
        if self.offset is None:
            print("Checking for most recent changelog offset...")
            msg = most_recent_message(topic)
            if msg:
                self.offset = json.loads(msg.decode('utf-8'))['index']
            else:
                self.offset = 1

        with topic.get_producer(
                max_request_size=self.produce_max_request_size,
                ) as producer:
            while True:
                latest = int(self.api.get_changelog(limit=1)[0].index)
                if latest > self.offset:
                    print("Fetching changelogs from {} through {}".format(
                        self.offset+1, latest))
                for i in range(self.offset+1, latest+1):
                    cle = self.api.get_changelog_entry(i)
                    obj = self.api.api_client.sanitize_for_serialization(cle)
                    producer.produce(
                        message=json.dumps(obj).encode('utf-8'),
                        partition_key=None,
                        timestamp=None,
                        #NOTE could be (???): timestamp=cle.timestamp,
                    )
                    self.offset = i
                print("Sleeping {} seconds...".format(self.poll_interval))
                time.sleep(self.poll_interval)


class EntityUpdatesWorker(FatcatWorker):
    """
    Consumes from the changelog topic and publishes expanded entities (fetched
    from API) to update topics.

    For now, only release updates are published.
    """

    def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic):
        super().__init__(kafka_hosts=kafka_hosts,
                         consume_topic=consume_topic,
                         api=api)
        self.release_topic = release_topic
        self.file_topic = file_topic
        self.container_topic = container_topic
        self.consumer_group = "entity-updates"

    def run(self):
        changelog_topic = self.kafka.topics[self.consume_topic]
        release_topic = self.kafka.topics[self.release_topic]
        file_topic = self.kafka.topics[self.file_topic]
        container_topic = self.kafka.topics[self.container_topic]

        consumer = changelog_topic.get_balanced_consumer(
            consumer_group=self.consumer_group,
            managed=True,
            auto_offset_reset=OffsetType.LATEST,
            reset_offset_on_start=False,
            fetch_message_max_bytes=10000000, # up to ~10 MBytes
            auto_commit_enable=True,
            auto_commit_interval_ms=30000, # 30 seconds
            compacted_topic=True,
        )

        # using a sync producer to try and avoid racey loss of delivery (aka,
        # if consumer group updated but produce didn't stick)
        release_producer = release_topic.get_sync_producer(
            max_request_size=self.produce_max_request_size,
        )
        file_producer = file_topic.get_sync_producer(
            max_request_size=self.produce_max_request_size,
        )
        container_producer = container_topic.get_sync_producer(
            max_request_size=self.produce_max_request_size,
        )

        for msg in consumer:
            cle = json.loads(msg.value.decode('utf-8'))
            #print(cle)
            print("processing changelog index {}".format(cle['index']))
            release_ids = []
            file_ids = []
            container_ids = []
            work_ids = []
            release_edits = cle['editgroup']['edits']['releases']
            for re in release_edits:
                release_ids.append(re['ident'])
            file_edits = cle['editgroup']['edits']['files']
            for e in file_edits:
                file_ids.append(e['ident'])
            container_edits = cle['editgroup']['edits']['containers']
            for e in container_edits:
                container_ids.append(e['ident'])
            work_edits = cle['editgroup']['edits']['works']
            for e in work_edits:
                work_ids.append(e['ident'])

            # TODO: do these fetches in parallel using a thread pool?
            for ident in set(file_ids):
                file_entity = self.api.get_file(ident, expand=None)
                # update release when a file changes
                # TODO: fetch old revision as well, and only update
                # releases for which list changed
                release_ids.extend(file_entity.release_ids or [])
                file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
                file_producer.produce(
                    message=json.dumps(file_dict).encode('utf-8'),
                    partition_key=ident.encode('utf-8'),
                    timestamp=None,
                )
            for ident in set(container_ids):
                container = self.api.get_container(ident)
                container_dict = self.api.api_client.sanitize_for_serialization(container)
                container_producer.produce(
                    message=json.dumps(container_dict).encode('utf-8'),
                    partition_key=ident.encode('utf-8'),
                    timestamp=None,
                )
            for ident in set(release_ids):
                release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
                work_ids.append(release.work_id)
                release_dict = self.api.api_client.sanitize_for_serialization(release)
                release_producer.produce(
                    message=json.dumps(release_dict).encode('utf-8'),
                    partition_key=ident.encode('utf-8'),
                    timestamp=None,
                )
            # TODO: actually update works

            #consumer.commit_offsets()