From 9d2bde08e9f43942b76ca6d0f6c8845c33c02515 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 12 Nov 2019 21:14:32 -0800 Subject: add ingest request feature to entity_updates worker Initially was going to create a new worker to consume from the release update channel, but couldn't get the edit context ("is this a new release, or update to an existing") from that context. Currently there is a flag in source code to control whether we only do OA releases or all releases. Starting with OA only to start slow, but should probably default to all, and make this a config flag. Should probably also have a config flag to control this entire feature. Tested locally in dev. --- python/fatcat_tools/workers/changelog.py | 24 ++++++++++++++++++++---- python/fatcat_worker.py | 2 ++ 2 files changed, 22 insertions(+), 4 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 8b1ba5e9..fe5c55be 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,6 +3,7 @@ import json import time from confluent_kafka import Consumer, Producer, KafkaException +from fatcat_tools.transforms import release_ingest_request from .worker_common import FatcatWorker, most_recent_message @@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker): """ Consumes from the changelog topic and publishes expanded entities (fetched from API) to update topics. - - For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, + file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic + self.ingest_file_request_topic = ingest_file_request_topic self.poll_interval = poll_interval self.consumer_group = "entity-updates" + self.ingest_oa_only = True def run(self): @@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker): #print(cle) print("processing changelog index {}".format(cle['index'])) release_ids = [] + new_release_ids = [] file_ids = [] container_ids = [] work_ids = [] release_edits = cle['editgroup']['edits']['releases'] for re in release_edits: release_ids.append(re['ident']) + # filter to direct release edits which are not updates + if not re.get('prev_revision') and not re.get('redirect_ident'): + new_release_ids.append(re['ident']) file_edits = cle['editgroup']['edits']['files'] for e in file_edits: file_ids.append(e['ident']) @@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker): key=ident.encode('utf-8'), on_delivery=fail_fast, ) + # filter to "new" active releases with no matched files + if release.ident in new_release_ids: + ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only) + if ir and ir['ingest_type'] == 'file' and not release.files: + producer.produce( + self.ingest_file_request_topic, + json.dumps(ir).encode('utf-8'), + #key=None, + on_delivery=fail_fast, + ) producer.flush() - # TODO: actually update works + # TODO: publish updated 'work' entities to a topic consumer.store_offsets(message=msg) diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 628312be..c2120bae 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -23,11 +23,13 @@ def run_entity_updates(args): release_topic = "fatcat-{}.release-updates-v03".format(args.env) file_topic = "fatcat-{}.file-updates".format(args.env) container_topic = "fatcat-{}.container-updates".format(args.env) + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic, release_topic=release_topic, file_topic=file_topic, container_topic=container_topic, + ingest_file_request_topic=ingest_file_request_topic, ) worker.run() -- cgit v1.2.3