diff options
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 123 |
1 files changed, 69 insertions, 54 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 007b2015..5ef1c69c 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -63,16 +63,20 @@ class EntityUpdatesWorker(FatcatWorker): For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic + self.file_topic = file_topic + self.container_topic = container_topic self.consumer_group = "entity-updates" def run(self): changelog_topic = self.kafka.topics[self.consume_topic] release_topic = self.kafka.topics[self.release_topic] + file_topic = self.kafka.topics[self.release_topic] + container_topic = self.kafka.topics[self.release_topic] consumer = changelog_topic.get_balanced_consumer( consumer_group=self.consumer_group, @@ -87,56 +91,67 @@ class EntityUpdatesWorker(FatcatWorker): # using a sync producer to try and avoid racey loss of delivery (aka, # if consumer group updated but produce didn't stick) - with release_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) - #print(cle) - print("processing changelog index {}".format(cle['index'])) - release_ids = [] - file_ids = [] - container_ids = [] - release_edits = cle['editgroup']['edits']['releases'] - for re in release_edits: - release_ids.append(re['ident']) - file_edits = cle['editgroup']['edits']['files'] - for e in file_edits: - file_ids.append(e['ident']) - # update release when a file changes - # TODO: fetch old revision as well, and only update - # releases for which list changed - release_ids.extend(e['release_ids']) - container_edits = cle['editgroup']['edits']['containers'] - for e in container_edits: - container_ids.append(e['ident']) - - # TODO: do these fetches in parallel using a thread pool? - for ident in set(release_ids): - release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") - release_dict = self.api.api_client.sanitize_for_serialization(release) - producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - if False: # TODO: other topics, producers - for ident in set(file_ids): - file_entity = self.api.get_file(ident, expand=None) - file_dict = self.api.api_client.sanitize_for_serialization(file_entity) - producer.produce( - message=json.dumps(file_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - for ident in set(container_ids): - container = self.api.get_container(ident) - container_dict = self.api.api_client.sanitize_for_serialization(container) - producer.produce( - message=json.dumps(container_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - # TODO: track work_ids for all releases updated - - #consumer.commit_offsets() + release_producer = release_topic.get_sync_producer( + max_request_size=self.produce_max_request_size, + ) + file_producer = file_topic.get_sync_producer( + max_request_size=self.produce_max_request_size, + ) + container_producer = container_topic.get_sync_producer( + max_request_size=self.produce_max_request_size, + ) + + for msg in consumer: + cle = json.loads(msg.value.decode('utf-8')) + #print(cle) + print("processing changelog index {}".format(cle['index'])) + release_ids = [] + file_ids = [] + container_ids = [] + work_ids = [] + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + release_ids.append(re['ident']) + file_edits = cle['editgroup']['edits']['files'] + for e in file_edits: + file_ids.append(e['ident']) + container_edits = cle['editgroup']['edits']['containers'] + for e in container_edits: + container_ids.append(e['ident']) + work_edits = cle['editgroup']['edits']['works'] + for e in work_edits: + work_ids.append(e['ident']) + + # TODO: do these fetches in parallel using a thread pool? + for ident in set(file_ids): + file_entity = self.api.get_file(ident, expand=None) + # update release when a file changes + # TODO: fetch old revision as well, and only update + # releases for which list changed + release_ids.extend(e['release_ids']) + file_dict = self.api.api_client.sanitize_for_serialization(file_entity) + file_producer.produce( + message=json.dumps(file_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + for ident in set(container_ids): + container = self.api.get_container(ident) + container_dict = self.api.api_client.sanitize_for_serialization(container) + container_producer.produce( + message=json.dumps(container_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + for ident in set(release_ids): + release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") + work_ids.append(release.work_id) + release_dict = self.api.api_client.sanitize_for_serialization(release) + release_producer.produce( + message=json.dumps(release_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + # TODO: actually update works + + #consumer.commit_offsets() |