diff options
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 8b1ba5e9..fe5c55be 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,6 +3,7 @@ import json import time from confluent_kafka import Consumer, Producer, KafkaException +from fatcat_tools.transforms import release_ingest_request from .worker_common import FatcatWorker, most_recent_message @@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker): """ Consumes from the changelog topic and publishes expanded entities (fetched from API) to update topics. - - For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, + file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic + self.ingest_file_request_topic = ingest_file_request_topic self.poll_interval = poll_interval self.consumer_group = "entity-updates" + self.ingest_oa_only = True def run(self): @@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker): #print(cle) print("processing changelog index {}".format(cle['index'])) release_ids = [] + new_release_ids = [] file_ids = [] container_ids = [] work_ids = [] release_edits = cle['editgroup']['edits']['releases'] for re in release_edits: release_ids.append(re['ident']) + # filter to direct release edits which are not updates + if not re.get('prev_revision') and not re.get('redirect_ident'): + new_release_ids.append(re['ident']) file_edits = cle['editgroup']['edits']['files'] for e in file_edits: file_ids.append(e['ident']) @@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker): key=ident.encode('utf-8'), on_delivery=fail_fast, ) + # filter to "new" active releases with no matched files + if release.ident in new_release_ids: + ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only) + if ir and ir['ingest_type'] == 'file' and not release.files: + producer.produce( + self.ingest_file_request_topic, + json.dumps(ir).encode('utf-8'), + #key=None, + on_delivery=fail_fast, + ) producer.flush() - # TODO: actually update works + # TODO: publish updated 'work' entities to a topic consumer.store_offsets(message=msg) |