summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r--python/fatcat_tools/workers/changelog.py123
1 files changed, 69 insertions, 54 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 007b2015..5ef1c69c 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -63,16 +63,20 @@ class EntityUpdatesWorker(FatcatWorker):
For now, only release updates are published.
"""
- def __init__(self, api, kafka_hosts, consume_topic, release_topic):
+ def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
api=api)
self.release_topic = release_topic
+ self.file_topic = file_topic
+ self.container_topic = container_topic
self.consumer_group = "entity-updates"
def run(self):
changelog_topic = self.kafka.topics[self.consume_topic]
release_topic = self.kafka.topics[self.release_topic]
+ file_topic = self.kafka.topics[self.release_topic]
+ container_topic = self.kafka.topics[self.release_topic]
consumer = changelog_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
@@ -87,56 +91,67 @@ class EntityUpdatesWorker(FatcatWorker):
# using a sync producer to try and avoid racey loss of delivery (aka,
# if consumer group updated but produce didn't stick)
- with release_topic.get_sync_producer(
- max_request_size=self.produce_max_request_size,
- ) as producer:
- for msg in consumer:
- cle = json.loads(msg.value.decode('utf-8'))
- #print(cle)
- print("processing changelog index {}".format(cle['index']))
- release_ids = []
- file_ids = []
- container_ids = []
- release_edits = cle['editgroup']['edits']['releases']
- for re in release_edits:
- release_ids.append(re['ident'])
- file_edits = cle['editgroup']['edits']['files']
- for e in file_edits:
- file_ids.append(e['ident'])
- # update release when a file changes
- # TODO: fetch old revision as well, and only update
- # releases for which list changed
- release_ids.extend(e['release_ids'])
- container_edits = cle['editgroup']['edits']['containers']
- for e in container_edits:
- container_ids.append(e['ident'])
-
- # TODO: do these fetches in parallel using a thread pool?
- for ident in set(release_ids):
- release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
- release_dict = self.api.api_client.sanitize_for_serialization(release)
- producer.produce(
- message=json.dumps(release_dict).encode('utf-8'),
- partition_key=ident.encode('utf-8'),
- timestamp=None,
- )
- if False: # TODO: other topics, producers
- for ident in set(file_ids):
- file_entity = self.api.get_file(ident, expand=None)
- file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
- producer.produce(
- message=json.dumps(file_dict).encode('utf-8'),
- partition_key=ident.encode('utf-8'),
- timestamp=None,
- )
- for ident in set(container_ids):
- container = self.api.get_container(ident)
- container_dict = self.api.api_client.sanitize_for_serialization(container)
- producer.produce(
- message=json.dumps(container_dict).encode('utf-8'),
- partition_key=ident.encode('utf-8'),
- timestamp=None,
- )
- # TODO: track work_ids for all releases updated
-
- #consumer.commit_offsets()
+ release_producer = release_topic.get_sync_producer(
+ max_request_size=self.produce_max_request_size,
+ )
+ file_producer = file_topic.get_sync_producer(
+ max_request_size=self.produce_max_request_size,
+ )
+ container_producer = container_topic.get_sync_producer(
+ max_request_size=self.produce_max_request_size,
+ )
+
+ for msg in consumer:
+ cle = json.loads(msg.value.decode('utf-8'))
+ #print(cle)
+ print("processing changelog index {}".format(cle['index']))
+ release_ids = []
+ file_ids = []
+ container_ids = []
+ work_ids = []
+ release_edits = cle['editgroup']['edits']['releases']
+ for re in release_edits:
+ release_ids.append(re['ident'])
+ file_edits = cle['editgroup']['edits']['files']
+ for e in file_edits:
+ file_ids.append(e['ident'])
+ container_edits = cle['editgroup']['edits']['containers']
+ for e in container_edits:
+ container_ids.append(e['ident'])
+ work_edits = cle['editgroup']['edits']['works']
+ for e in work_edits:
+ work_ids.append(e['ident'])
+
+ # TODO: do these fetches in parallel using a thread pool?
+ for ident in set(file_ids):
+ file_entity = self.api.get_file(ident, expand=None)
+ # update release when a file changes
+ # TODO: fetch old revision as well, and only update
+ # releases for which list changed
+ release_ids.extend(e['release_ids'])
+ file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
+ file_producer.produce(
+ message=json.dumps(file_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ for ident in set(container_ids):
+ container = self.api.get_container(ident)
+ container_dict = self.api.api_client.sanitize_for_serialization(container)
+ container_producer.produce(
+ message=json.dumps(container_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ for ident in set(release_ids):
+ release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
+ work_ids.append(release.work_id)
+ release_dict = self.api.api_client.sanitize_for_serialization(release)
+ release_producer.produce(
+ message=json.dumps(release_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ # TODO: actually update works
+
+ #consumer.commit_offsets()