diff options
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 37 |
1 files changed, 36 insertions, 1 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 6319d55a..007b2015 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -94,9 +94,25 @@ class EntityUpdatesWorker(FatcatWorker): cle = json.loads(msg.value.decode('utf-8')) #print(cle) print("processing changelog index {}".format(cle['index'])) + release_ids = [] + file_ids = [] + container_ids = [] release_edits = cle['editgroup']['edits']['releases'] for re in release_edits: - ident = re['ident'] + release_ids.append(re['ident']) + file_edits = cle['editgroup']['edits']['files'] + for e in file_edits: + file_ids.append(e['ident']) + # update release when a file changes + # TODO: fetch old revision as well, and only update + # releases for which list changed + release_ids.extend(e['release_ids']) + container_edits = cle['editgroup']['edits']['containers'] + for e in container_edits: + container_ids.append(e['ident']) + + # TODO: do these fetches in parallel using a thread pool? + for ident in set(release_ids): release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") release_dict = self.api.api_client.sanitize_for_serialization(release) producer.produce( @@ -104,4 +120,23 @@ class EntityUpdatesWorker(FatcatWorker): partition_key=ident.encode('utf-8'), timestamp=None, ) + if False: # TODO: other topics, producers + for ident in set(file_ids): + file_entity = self.api.get_file(ident, expand=None) + file_dict = self.api.api_client.sanitize_for_serialization(file_entity) + producer.produce( + message=json.dumps(file_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + for ident in set(container_ids): + container = self.api.get_container(ident) + container_dict = self.api.api_client.sanitize_for_serialization(container) + producer.produce( + message=json.dumps(container_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + # TODO: track work_ids for all releases updated + #consumer.commit_offsets() |