diff options
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 24 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 2 |
2 files changed, 22 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 8b1ba5e9..fe5c55be 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,6 +3,7 @@ import json import time from confluent_kafka import Consumer, Producer, KafkaException +from fatcat_tools.transforms import release_ingest_request from .worker_common import FatcatWorker, most_recent_message @@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker): """ Consumes from the changelog topic and publishes expanded entities (fetched from API) to update topics. - - For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, + file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic + self.ingest_file_request_topic = ingest_file_request_topic self.poll_interval = poll_interval self.consumer_group = "entity-updates" + self.ingest_oa_only = True def run(self): @@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker): #print(cle) print("processing changelog index {}".format(cle['index'])) release_ids = [] + new_release_ids = [] file_ids = [] container_ids = [] work_ids = [] release_edits = cle['editgroup']['edits']['releases'] for re in release_edits: release_ids.append(re['ident']) + # filter to direct release edits which are not updates + if not re.get('prev_revision') and not re.get('redirect_ident'): + new_release_ids.append(re['ident']) file_edits = cle['editgroup']['edits']['files'] for e in file_edits: file_ids.append(e['ident']) @@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker): key=ident.encode('utf-8'), on_delivery=fail_fast, ) + # filter to "new" active releases with no matched files + if release.ident in new_release_ids: + ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only) + if ir and ir['ingest_type'] == 'file' and not release.files: + producer.produce( + self.ingest_file_request_topic, + json.dumps(ir).encode('utf-8'), + #key=None, + on_delivery=fail_fast, + ) producer.flush() - # TODO: actually update works + # TODO: publish updated 'work' entities to a topic consumer.store_offsets(message=msg) diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 628312be..c2120bae 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -23,11 +23,13 @@ def run_entity_updates(args): release_topic = "fatcat-{}.release-updates-v03".format(args.env) file_topic = "fatcat-{}.file-updates".format(args.env) container_topic = "fatcat-{}.container-updates".format(args.env) + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic, release_topic=release_topic, file_topic=file_topic, container_topic=container_topic, + ingest_file_request_topic=ingest_file_request_topic, ) worker.run() |