diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2020-10-15 20:53:11 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2020-10-16 16:41:16 -0700 | 
| commit | 99c54764f0cbbb05409001b4af83182842f4e52d (patch) | |
| tree | aede638925dabfac40bbd1c77607e4b31e3a234c /python | |
| parent | a7765fb8b6011c224dc1e4d7b0e30159bae7a903 (diff) | |
| download | fatcat-99c54764f0cbbb05409001b4af83182842f4e52d.tar.gz fatcat-99c54764f0cbbb05409001b4af83182842f4e52d.zip | |
entity updater: new work update feed (ident and changelog metadata only)
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 26 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 2 | 
2 files changed, 26 insertions, 2 deletions
| diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 13e46e02..d9f0cf91 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -79,7 +79,8 @@ class EntityUpdatesWorker(FatcatWorker):      """      def __init__(self, api, kafka_hosts, consume_topic, release_topic, -            file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0): +            file_topic, container_topic, ingest_file_request_topic, +            work_ident_topic, poll_interval=5.0):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic,                           api=api) @@ -87,6 +88,7 @@ class EntityUpdatesWorker(FatcatWorker):          self.file_topic = file_topic          self.container_topic = container_topic          self.ingest_file_request_topic = ingest_file_request_topic +        self.work_ident_topic = work_ident_topic          self.poll_interval = poll_interval          self.consumer_group = "entity-updates"          self.ingest_oa_only = False @@ -365,7 +367,8 @@ class EntityUpdatesWorker(FatcatWorker):                  )              for ident in set(release_ids):                  release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") -                work_ids.append(release.work_id) +                if release.work_id: +                    work_ids.append(release.work_id)                  release_dict = self.api.api_client.sanitize_for_serialization(release)                  producer.produce(                      self.release_topic, @@ -383,6 +386,25 @@ class EntityUpdatesWorker(FatcatWorker):                              #key=None,                              on_delivery=fail_fast,                          ) + +            # send work updates (just ident and changelog metadata) to scholar for re-indexing +            for ident in set(work_ids): +                assert ident +                key = f"work_{ident}" +                work_ident_dict = dict( +                    key=key, +                    type="fatcat_work", +                    work_ident=ident, +                    updated=cle['timestamp'], +                    fatcat_changelog_index=cle['index'], +                ) +                producer.produce( +                    self.work_ident_topic, +                    json.dumps(work_ident_dict).encode('utf-8'), +                    key=key.encode('utf-8'), +                    on_delivery=fail_fast, +                ) +              producer.flush()              # TODO: publish updated 'work' entities to a topic              consumer.store_offsets(message=msg) diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 19ac16cd..5e3fb3e9 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -22,12 +22,14 @@ def run_entity_updates(args):      release_topic = "fatcat-{}.release-updates-v03".format(args.env)      file_topic = "fatcat-{}.file-updates".format(args.env)      container_topic = "fatcat-{}.container-updates".format(args.env) +    work_ident_topic = "fatcat-{}.work-ident-updates".format(args.env)      ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)      worker = EntityUpdatesWorker(args.api, args.kafka_hosts,          changelog_topic,          release_topic=release_topic,          file_topic=file_topic,          container_topic=container_topic, +        work_ident_topic=work_ident_topic,          ingest_file_request_topic=ingest_file_request_topic,      )      worker.run() | 
