summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-05-30 11:44:30 -0700
committerBryan Newbold <bnewbold@robocracy.org>2019-05-30 11:44:30 -0700
commite89814d321dcaa2a7d62b429e5c09a7eef2173bf (patch)
tree5d26657ff93167a98aa2294685961e54f883c480
parente15fdb84133be97ddd2d229818edaaa3f133047f (diff)
downloadfatcat-e89814d321dcaa2a7d62b429e5c09a7eef2173bf.tar.gz
fatcat-e89814d321dcaa2a7d62b429e5c09a7eef2173bf.zip
file and container update kafka topics
-rw-r--r--python/fatcat_tools/workers/changelog.py123
-rwxr-xr-xpython/fatcat_worker.py10
2 files changed, 77 insertions, 56 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 007b2015..5ef1c69c 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -63,16 +63,20 @@ class EntityUpdatesWorker(FatcatWorker):
For now, only release updates are published.
"""
- def __init__(self, api, kafka_hosts, consume_topic, release_topic):
+ def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
api=api)
self.release_topic = release_topic
+ self.file_topic = file_topic
+ self.container_topic = container_topic
self.consumer_group = "entity-updates"
def run(self):
changelog_topic = self.kafka.topics[self.consume_topic]
release_topic = self.kafka.topics[self.release_topic]
+ file_topic = self.kafka.topics[self.release_topic]
+ container_topic = self.kafka.topics[self.release_topic]
consumer = changelog_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
@@ -87,56 +91,67 @@ class EntityUpdatesWorker(FatcatWorker):
# using a sync producer to try and avoid racey loss of delivery (aka,
# if consumer group updated but produce didn't stick)
- with release_topic.get_sync_producer(
- max_request_size=self.produce_max_request_size,
- ) as producer:
- for msg in consumer:
- cle = json.loads(msg.value.decode('utf-8'))
- #print(cle)
- print("processing changelog index {}".format(cle['index']))
- release_ids = []
- file_ids = []
- container_ids = []
- release_edits = cle['editgroup']['edits']['releases']
- for re in release_edits:
- release_ids.append(re['ident'])
- file_edits = cle['editgroup']['edits']['files']
- for e in file_edits:
- file_ids.append(e['ident'])
- # update release when a file changes
- # TODO: fetch old revision as well, and only update
- # releases for which list changed
- release_ids.extend(e['release_ids'])
- container_edits = cle['editgroup']['edits']['containers']
- for e in container_edits:
- container_ids.append(e['ident'])
-
- # TODO: do these fetches in parallel using a thread pool?
- for ident in set(release_ids):
- release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
- release_dict = self.api.api_client.sanitize_for_serialization(release)
- producer.produce(
- message=json.dumps(release_dict).encode('utf-8'),
- partition_key=ident.encode('utf-8'),
- timestamp=None,
- )
- if False: # TODO: other topics, producers
- for ident in set(file_ids):
- file_entity = self.api.get_file(ident, expand=None)
- file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
- producer.produce(
- message=json.dumps(file_dict).encode('utf-8'),
- partition_key=ident.encode('utf-8'),
- timestamp=None,
- )
- for ident in set(container_ids):
- container = self.api.get_container(ident)
- container_dict = self.api.api_client.sanitize_for_serialization(container)
- producer.produce(
- message=json.dumps(container_dict).encode('utf-8'),
- partition_key=ident.encode('utf-8'),
- timestamp=None,
- )
- # TODO: track work_ids for all releases updated
-
- #consumer.commit_offsets()
+ release_producer = release_topic.get_sync_producer(
+ max_request_size=self.produce_max_request_size,
+ )
+ file_producer = file_topic.get_sync_producer(
+ max_request_size=self.produce_max_request_size,
+ )
+ container_producer = container_topic.get_sync_producer(
+ max_request_size=self.produce_max_request_size,
+ )
+
+ for msg in consumer:
+ cle = json.loads(msg.value.decode('utf-8'))
+ #print(cle)
+ print("processing changelog index {}".format(cle['index']))
+ release_ids = []
+ file_ids = []
+ container_ids = []
+ work_ids = []
+ release_edits = cle['editgroup']['edits']['releases']
+ for re in release_edits:
+ release_ids.append(re['ident'])
+ file_edits = cle['editgroup']['edits']['files']
+ for e in file_edits:
+ file_ids.append(e['ident'])
+ container_edits = cle['editgroup']['edits']['containers']
+ for e in container_edits:
+ container_ids.append(e['ident'])
+ work_edits = cle['editgroup']['edits']['works']
+ for e in work_edits:
+ work_ids.append(e['ident'])
+
+ # TODO: do these fetches in parallel using a thread pool?
+ for ident in set(file_ids):
+ file_entity = self.api.get_file(ident, expand=None)
+ # update release when a file changes
+ # TODO: fetch old revision as well, and only update
+ # releases for which list changed
+ release_ids.extend(e['release_ids'])
+ file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
+ file_producer.produce(
+ message=json.dumps(file_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ for ident in set(container_ids):
+ container = self.api.get_container(ident)
+ container_dict = self.api.api_client.sanitize_for_serialization(container)
+ container_producer.produce(
+ message=json.dumps(container_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ for ident in set(release_ids):
+ release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
+ work_ids.append(release.work_id)
+ release_dict = self.api.api_client.sanitize_for_serialization(release)
+ release_producer.produce(
+ message=json.dumps(release_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ # TODO: actually update works
+
+ #consumer.commit_offsets()
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index be58810a..267779ff 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -21,8 +21,14 @@ def run_changelog(args):
def run_entity_updates(args):
changelog_topic = "fatcat-{}.changelog".format(args.env)
release_topic = "fatcat-{}.release-updates-v03".format(args.env)
- worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic,
- release_topic=release_topic)
+ file_topic = "fatcat-{}.file-updates".format(args.env)
+ container_topic = "fatcat-{}.container-updates".format(args.env)
+ worker = EntityUpdatesWorker(args.api, args.kafka_hosts,
+ changelog_topic,
+ release_topic=release_topic,
+ file_topic=file_topic,
+ container_topic=container_topic,
+ )
worker.run()
def run_elasticsearch_release(args):