aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/changelog.py37
1 files changed, 36 insertions, 1 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 6319d55a..007b2015 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -94,9 +94,25 @@ class EntityUpdatesWorker(FatcatWorker):
cle = json.loads(msg.value.decode('utf-8'))
#print(cle)
print("processing changelog index {}".format(cle['index']))
+ release_ids = []
+ file_ids = []
+ container_ids = []
release_edits = cle['editgroup']['edits']['releases']
for re in release_edits:
- ident = re['ident']
+ release_ids.append(re['ident'])
+ file_edits = cle['editgroup']['edits']['files']
+ for e in file_edits:
+ file_ids.append(e['ident'])
+ # update release when a file changes
+ # TODO: fetch old revision as well, and only update
+ # releases for which list changed
+ release_ids.extend(e['release_ids'])
+ container_edits = cle['editgroup']['edits']['containers']
+ for e in container_edits:
+ container_ids.append(e['ident'])
+
+ # TODO: do these fetches in parallel using a thread pool?
+ for ident in set(release_ids):
release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
release_dict = self.api.api_client.sanitize_for_serialization(release)
producer.produce(
@@ -104,4 +120,23 @@ class EntityUpdatesWorker(FatcatWorker):
partition_key=ident.encode('utf-8'),
timestamp=None,
)
+ if False: # TODO: other topics, producers
+ for ident in set(file_ids):
+ file_entity = self.api.get_file(ident, expand=None)
+ file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
+ producer.produce(
+ message=json.dumps(file_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ for ident in set(container_ids):
+ container = self.api.get_container(ident)
+ container_dict = self.api.api_client.sanitize_for_serialization(container)
+ producer.produce(
+ message=json.dumps(container_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ # TODO: track work_ids for all releases updated
+
#consumer.commit_offsets()