aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-05-30 11:13:33 -0700
committerBryan Newbold <bnewbold@robocracy.org>2019-05-30 11:13:35 -0700
commite15fdb84133be97ddd2d229818edaaa3f133047f (patch)
tree370646499ee391da02d9b0cc33a8becd37c21ec1 /python/fatcat_tools/workers
parent5dce3ab3b01ddd2b2f3fa8af76038f10d225008e (diff)
downloadfatcat-e15fdb84133be97ddd2d229818edaaa3f133047f.tar.gz
fatcat-e15fdb84133be97ddd2d229818edaaa3f133047f.zip
update elastic for releases when files added
A bunch of remaining TODOs here
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/changelog.py37
1 files changed, 36 insertions, 1 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 6319d55a..007b2015 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -94,9 +94,25 @@ class EntityUpdatesWorker(FatcatWorker):
cle = json.loads(msg.value.decode('utf-8'))
#print(cle)
print("processing changelog index {}".format(cle['index']))
+ release_ids = []
+ file_ids = []
+ container_ids = []
release_edits = cle['editgroup']['edits']['releases']
for re in release_edits:
- ident = re['ident']
+ release_ids.append(re['ident'])
+ file_edits = cle['editgroup']['edits']['files']
+ for e in file_edits:
+ file_ids.append(e['ident'])
+ # update release when a file changes
+ # TODO: fetch old revision as well, and only update
+ # releases for which list changed
+ release_ids.extend(e['release_ids'])
+ container_edits = cle['editgroup']['edits']['containers']
+ for e in container_edits:
+ container_ids.append(e['ident'])
+
+ # TODO: do these fetches in parallel using a thread pool?
+ for ident in set(release_ids):
release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
release_dict = self.api.api_client.sanitize_for_serialization(release)
producer.produce(
@@ -104,4 +120,23 @@ class EntityUpdatesWorker(FatcatWorker):
partition_key=ident.encode('utf-8'),
timestamp=None,
)
+ if False: # TODO: other topics, producers
+ for ident in set(file_ids):
+ file_entity = self.api.get_file(ident, expand=None)
+ file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
+ producer.produce(
+ message=json.dumps(file_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ for ident in set(container_ids):
+ container = self.api.get_container(ident)
+ container_dict = self.api.api_client.sanitize_for_serialization(container)
+ producer.produce(
+ message=json.dumps(container_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ # TODO: track work_ids for all releases updated
+
#consumer.commit_offsets()