diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-11-12 21:14:32 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-11-15 16:46:26 -0800 |
commit | 9d2bde08e9f43942b76ca6d0f6c8845c33c02515 (patch) | |
tree | 38f54a2ebe210b97289a6ed6463ef07be8122a9b /python | |
parent | 06cf64414d4c7fa497c9ddb83b7c066b3779c4d2 (diff) | |
download | fatcat-9d2bde08e9f43942b76ca6d0f6c8845c33c02515.tar.gz fatcat-9d2bde08e9f43942b76ca6d0f6c8845c33c02515.zip |
add ingest request feature to entity_updates worker
Initially was going to create a new worker to consume from the release
update channel, but couldn't get the edit context ("is this a new
release, or update to an existing") from that context.
Currently there is a flag in source code to control whether we only do
OA releases or all releases. Starting with OA only to start slow, but
should probably default to all, and make this a config flag. Should
probably also have a config flag to control this entire feature.
Tested locally in dev.
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 24 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 2 |
2 files changed, 22 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 8b1ba5e9..fe5c55be 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,6 +3,7 @@ import json import time from confluent_kafka import Consumer, Producer, KafkaException +from fatcat_tools.transforms import release_ingest_request from .worker_common import FatcatWorker, most_recent_message @@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker): """ Consumes from the changelog topic and publishes expanded entities (fetched from API) to update topics. - - For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, + file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic + self.ingest_file_request_topic = ingest_file_request_topic self.poll_interval = poll_interval self.consumer_group = "entity-updates" + self.ingest_oa_only = True def run(self): @@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker): #print(cle) print("processing changelog index {}".format(cle['index'])) release_ids = [] + new_release_ids = [] file_ids = [] container_ids = [] work_ids = [] release_edits = cle['editgroup']['edits']['releases'] for re in release_edits: release_ids.append(re['ident']) + # filter to direct release edits which are not updates + if not re.get('prev_revision') and not re.get('redirect_ident'): + new_release_ids.append(re['ident']) file_edits = cle['editgroup']['edits']['files'] for e in file_edits: file_ids.append(e['ident']) @@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker): key=ident.encode('utf-8'), on_delivery=fail_fast, ) + # filter to "new" active releases with no matched files + if release.ident in new_release_ids: + ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only) + if ir and ir['ingest_type'] == 'file' and not release.files: + producer.produce( + self.ingest_file_request_topic, + json.dumps(ir).encode('utf-8'), + #key=None, + on_delivery=fail_fast, + ) producer.flush() - # TODO: actually update works + # TODO: publish updated 'work' entities to a topic consumer.store_offsets(message=msg) diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 628312be..c2120bae 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -23,11 +23,13 @@ def run_entity_updates(args): release_topic = "fatcat-{}.release-updates-v03".format(args.env) file_topic = "fatcat-{}.file-updates".format(args.env) container_topic = "fatcat-{}.container-updates".format(args.env) + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic, release_topic=release_topic, file_topic=file_topic, container_topic=container_topic, + ingest_file_request_topic=ingest_file_request_topic, ) worker.run() |