diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-05-30 11:44:30 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-05-30 11:44:30 -0700 | 
| commit | e89814d321dcaa2a7d62b429e5c09a7eef2173bf (patch) | |
| tree | 5d26657ff93167a98aa2294685961e54f883c480 /python | |
| parent | e15fdb84133be97ddd2d229818edaaa3f133047f (diff) | |
| download | fatcat-e89814d321dcaa2a7d62b429e5c09a7eef2173bf.tar.gz fatcat-e89814d321dcaa2a7d62b429e5c09a7eef2173bf.zip | |
file and container update kafka topics
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 123 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 10 | 
2 files changed, 77 insertions, 56 deletions
| diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 007b2015..5ef1c69c 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -63,16 +63,20 @@ class EntityUpdatesWorker(FatcatWorker):      For now, only release updates are published.      """ -    def __init__(self, api, kafka_hosts, consume_topic, release_topic): +    def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic,                           api=api)          self.release_topic = release_topic +        self.file_topic = file_topic +        self.container_topic = container_topic          self.consumer_group = "entity-updates"      def run(self):          changelog_topic = self.kafka.topics[self.consume_topic]          release_topic = self.kafka.topics[self.release_topic] +        file_topic = self.kafka.topics[self.release_topic] +        container_topic = self.kafka.topics[self.release_topic]          consumer = changelog_topic.get_balanced_consumer(              consumer_group=self.consumer_group, @@ -87,56 +91,67 @@ class EntityUpdatesWorker(FatcatWorker):          # using a sync producer to try and avoid racey loss of delivery (aka,          # if consumer group updated but produce didn't stick) -        with release_topic.get_sync_producer( -                max_request_size=self.produce_max_request_size, -                ) as producer: -            for msg in consumer: -                cle = json.loads(msg.value.decode('utf-8')) -                #print(cle) -                print("processing changelog index {}".format(cle['index'])) -                release_ids = [] -                file_ids = [] -                container_ids = [] -                release_edits = cle['editgroup']['edits']['releases'] -                for re in release_edits: -                    release_ids.append(re['ident']) -                file_edits = cle['editgroup']['edits']['files'] -                for e in file_edits: -                    file_ids.append(e['ident']) -                    # update release when a file changes -                    # TODO: fetch old revision as well, and only update -                    # releases for which list changed -                    release_ids.extend(e['release_ids']) -                container_edits = cle['editgroup']['edits']['containers'] -                for e in container_edits: -                    container_ids.append(e['ident']) - -                # TODO: do these fetches in parallel using a thread pool? -                for ident in set(release_ids): -                    release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") -                    release_dict = self.api.api_client.sanitize_for_serialization(release) -                    producer.produce( -                        message=json.dumps(release_dict).encode('utf-8'), -                        partition_key=ident.encode('utf-8'), -                        timestamp=None, -                    ) -                if False: # TODO: other topics, producers -                    for ident in set(file_ids): -                        file_entity = self.api.get_file(ident, expand=None) -                        file_dict = self.api.api_client.sanitize_for_serialization(file_entity) -                        producer.produce( -                            message=json.dumps(file_dict).encode('utf-8'), -                            partition_key=ident.encode('utf-8'), -                            timestamp=None, -                        ) -                    for ident in set(container_ids): -                        container = self.api.get_container(ident) -                        container_dict = self.api.api_client.sanitize_for_serialization(container) -                        producer.produce( -                            message=json.dumps(container_dict).encode('utf-8'), -                            partition_key=ident.encode('utf-8'), -                            timestamp=None, -                        ) -                # TODO: track work_ids for all releases updated - -                #consumer.commit_offsets() +        release_producer = release_topic.get_sync_producer( +            max_request_size=self.produce_max_request_size, +        ) +        file_producer = file_topic.get_sync_producer( +            max_request_size=self.produce_max_request_size, +        ) +        container_producer = container_topic.get_sync_producer( +            max_request_size=self.produce_max_request_size, +        ) + +        for msg in consumer: +            cle = json.loads(msg.value.decode('utf-8')) +            #print(cle) +            print("processing changelog index {}".format(cle['index'])) +            release_ids = [] +            file_ids = [] +            container_ids = [] +            work_ids = [] +            release_edits = cle['editgroup']['edits']['releases'] +            for re in release_edits: +                release_ids.append(re['ident']) +            file_edits = cle['editgroup']['edits']['files'] +            for e in file_edits: +                file_ids.append(e['ident']) +            container_edits = cle['editgroup']['edits']['containers'] +            for e in container_edits: +                container_ids.append(e['ident']) +            work_edits = cle['editgroup']['edits']['works'] +            for e in work_edits: +                work_ids.append(e['ident']) + +            # TODO: do these fetches in parallel using a thread pool? +            for ident in set(file_ids): +                file_entity = self.api.get_file(ident, expand=None) +                # update release when a file changes +                # TODO: fetch old revision as well, and only update +                # releases for which list changed +                release_ids.extend(e['release_ids']) +                file_dict = self.api.api_client.sanitize_for_serialization(file_entity) +                file_producer.produce( +                    message=json.dumps(file_dict).encode('utf-8'), +                    partition_key=ident.encode('utf-8'), +                    timestamp=None, +                ) +            for ident in set(container_ids): +                container = self.api.get_container(ident) +                container_dict = self.api.api_client.sanitize_for_serialization(container) +                container_producer.produce( +                    message=json.dumps(container_dict).encode('utf-8'), +                    partition_key=ident.encode('utf-8'), +                    timestamp=None, +                ) +            for ident in set(release_ids): +                release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") +                work_ids.append(release.work_id) +                release_dict = self.api.api_client.sanitize_for_serialization(release) +                release_producer.produce( +                    message=json.dumps(release_dict).encode('utf-8'), +                    partition_key=ident.encode('utf-8'), +                    timestamp=None, +                ) +            # TODO: actually update works + +            #consumer.commit_offsets() diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index be58810a..267779ff 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -21,8 +21,14 @@ def run_changelog(args):  def run_entity_updates(args):      changelog_topic = "fatcat-{}.changelog".format(args.env)      release_topic = "fatcat-{}.release-updates-v03".format(args.env) -    worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic, -        release_topic=release_topic) +    file_topic = "fatcat-{}.file-updates".format(args.env) +    container_topic = "fatcat-{}.container-updates".format(args.env) +    worker = EntityUpdatesWorker(args.api, args.kafka_hosts, +        changelog_topic, +        release_topic=release_topic, +        file_topic=file_topic, +        container_topic=container_topic, +    )      worker.run()  def run_elasticsearch_release(args): | 
