diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-11-12 21:14:32 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-11-15 16:46:26 -0800 | 
| commit | 9d2bde08e9f43942b76ca6d0f6c8845c33c02515 (patch) | |
| tree | 38f54a2ebe210b97289a6ed6463ef07be8122a9b /python | |
| parent | 06cf64414d4c7fa497c9ddb83b7c066b3779c4d2 (diff) | |
| download | fatcat-9d2bde08e9f43942b76ca6d0f6c8845c33c02515.tar.gz fatcat-9d2bde08e9f43942b76ca6d0f6c8845c33c02515.zip | |
add ingest request feature to entity_updates worker
Initially was going to create a new worker to consume from the release
update channel, but couldn't get the edit context ("is this a new
release, or update to an existing") from that context.
Currently there is a flag in source code to control whether we only do
OA releases or all releases. Starting with OA only to start slow, but
should probably default to all, and make this a config flag. Should
probably also have a config flag to control this entire feature.
Tested locally in dev.
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 24 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 2 | 
2 files changed, 22 insertions, 4 deletions
| diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 8b1ba5e9..fe5c55be 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,6 +3,7 @@ import json  import time  from confluent_kafka import Consumer, Producer, KafkaException +from fatcat_tools.transforms import release_ingest_request  from .worker_common import FatcatWorker, most_recent_message @@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker):      """      Consumes from the changelog topic and publishes expanded entities (fetched      from API) to update topics. - -    For now, only release updates are published.      """ -    def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): +    def __init__(self, api, kafka_hosts, consume_topic, release_topic, +            file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic,                           api=api)          self.release_topic = release_topic          self.file_topic = file_topic          self.container_topic = container_topic +        self.ingest_file_request_topic = ingest_file_request_topic          self.poll_interval = poll_interval          self.consumer_group = "entity-updates" +        self.ingest_oa_only = True      def run(self): @@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker):              #print(cle)              print("processing changelog index {}".format(cle['index']))              release_ids = [] +            new_release_ids = []              file_ids = []              container_ids = []              work_ids = []              release_edits = cle['editgroup']['edits']['releases']              for re in release_edits:                  release_ids.append(re['ident']) +                # filter to direct release edits which are not updates +                if not re.get('prev_revision') and not re.get('redirect_ident'): +                    new_release_ids.append(re['ident'])              file_edits = cle['editgroup']['edits']['files']              for e in file_edits:                  file_ids.append(e['ident']) @@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker):                      key=ident.encode('utf-8'),                      on_delivery=fail_fast,                  ) +                # filter to "new" active releases with no matched files +                if release.ident in new_release_ids: +                    ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only) +                    if ir and ir['ingest_type'] == 'file' and not release.files: +                        producer.produce( +                            self.ingest_file_request_topic, +                            json.dumps(ir).encode('utf-8'), +                            #key=None, +                            on_delivery=fail_fast, +                        )              producer.flush() -            # TODO: actually update works +            # TODO: publish updated 'work' entities to a topic              consumer.store_offsets(message=msg) diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 628312be..c2120bae 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -23,11 +23,13 @@ def run_entity_updates(args):      release_topic = "fatcat-{}.release-updates-v03".format(args.env)      file_topic = "fatcat-{}.file-updates".format(args.env)      container_topic = "fatcat-{}.container-updates".format(args.env) +    ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)      worker = EntityUpdatesWorker(args.api, args.kafka_hosts,          changelog_topic,          release_topic=release_topic,          file_topic=file_topic,          container_topic=container_topic, +        ingest_file_request_topic=ingest_file_request_topic,      )      worker.run() | 
