diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 123 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 10 |
2 files changed, 77 insertions, 56 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 007b2015..5ef1c69c 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -63,16 +63,20 @@ class EntityUpdatesWorker(FatcatWorker): For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic + self.file_topic = file_topic + self.container_topic = container_topic self.consumer_group = "entity-updates" def run(self): changelog_topic = self.kafka.topics[self.consume_topic] release_topic = self.kafka.topics[self.release_topic] + file_topic = self.kafka.topics[self.release_topic] + container_topic = self.kafka.topics[self.release_topic] consumer = changelog_topic.get_balanced_consumer( consumer_group=self.consumer_group, @@ -87,56 +91,67 @@ class EntityUpdatesWorker(FatcatWorker): # using a sync producer to try and avoid racey loss of delivery (aka, # if consumer group updated but produce didn't stick) - with release_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) - #print(cle) - print("processing changelog index {}".format(cle['index'])) - release_ids = [] - file_ids = [] - container_ids = [] - release_edits = cle['editgroup']['edits']['releases'] - for re in release_edits: - release_ids.append(re['ident']) - file_edits = cle['editgroup']['edits']['files'] - for e in file_edits: - file_ids.append(e['ident']) - # update release when a file changes - # TODO: fetch old revision as well, and only update - # releases for which list changed - release_ids.extend(e['release_ids']) - container_edits = cle['editgroup']['edits']['containers'] - for e in container_edits: - container_ids.append(e['ident']) - - # TODO: do these fetches in parallel using a thread pool? - for ident in set(release_ids): - release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") - release_dict = self.api.api_client.sanitize_for_serialization(release) - producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - if False: # TODO: other topics, producers - for ident in set(file_ids): - file_entity = self.api.get_file(ident, expand=None) - file_dict = self.api.api_client.sanitize_for_serialization(file_entity) - producer.produce( - message=json.dumps(file_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - for ident in set(container_ids): - container = self.api.get_container(ident) - container_dict = self.api.api_client.sanitize_for_serialization(container) - producer.produce( - message=json.dumps(container_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - # TODO: track work_ids for all releases updated - - #consumer.commit_offsets() + release_producer = release_topic.get_sync_producer( + max_request_size=self.produce_max_request_size, + ) + file_producer = file_topic.get_sync_producer( + max_request_size=self.produce_max_request_size, + ) + container_producer = container_topic.get_sync_producer( + max_request_size=self.produce_max_request_size, + ) + + for msg in consumer: + cle = json.loads(msg.value.decode('utf-8')) + #print(cle) + print("processing changelog index {}".format(cle['index'])) + release_ids = [] + file_ids = [] + container_ids = [] + work_ids = [] + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + release_ids.append(re['ident']) + file_edits = cle['editgroup']['edits']['files'] + for e in file_edits: + file_ids.append(e['ident']) + container_edits = cle['editgroup']['edits']['containers'] + for e in container_edits: + container_ids.append(e['ident']) + work_edits = cle['editgroup']['edits']['works'] + for e in work_edits: + work_ids.append(e['ident']) + + # TODO: do these fetches in parallel using a thread pool? + for ident in set(file_ids): + file_entity = self.api.get_file(ident, expand=None) + # update release when a file changes + # TODO: fetch old revision as well, and only update + # releases for which list changed + release_ids.extend(e['release_ids']) + file_dict = self.api.api_client.sanitize_for_serialization(file_entity) + file_producer.produce( + message=json.dumps(file_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + for ident in set(container_ids): + container = self.api.get_container(ident) + container_dict = self.api.api_client.sanitize_for_serialization(container) + container_producer.produce( + message=json.dumps(container_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + for ident in set(release_ids): + release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") + work_ids.append(release.work_id) + release_dict = self.api.api_client.sanitize_for_serialization(release) + release_producer.produce( + message=json.dumps(release_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + # TODO: actually update works + + #consumer.commit_offsets() diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index be58810a..267779ff 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -21,8 +21,14 @@ def run_changelog(args): def run_entity_updates(args): changelog_topic = "fatcat-{}.changelog".format(args.env) release_topic = "fatcat-{}.release-updates-v03".format(args.env) - worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic, - release_topic=release_topic) + file_topic = "fatcat-{}.file-updates".format(args.env) + container_topic = "fatcat-{}.container-updates".format(args.env) + worker = EntityUpdatesWorker(args.api, args.kafka_hosts, + changelog_topic, + release_topic=release_topic, + file_topic=file_topic, + container_topic=container_topic, + ) worker.run() def run_elasticsearch_release(args): |