summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/changelog.py24
1 files changed, 20 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 8b1ba5e9..fe5c55be 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -3,6 +3,7 @@ import json
import time
from confluent_kafka import Consumer, Producer, KafkaException
+from fatcat_tools.transforms import release_ingest_request
from .worker_common import FatcatWorker, most_recent_message
@@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker):
"""
Consumes from the changelog topic and publishes expanded entities (fetched
from API) to update topics.
-
- For now, only release updates are published.
"""
- def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0):
+ def __init__(self, api, kafka_hosts, consume_topic, release_topic,
+ file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
api=api)
self.release_topic = release_topic
self.file_topic = file_topic
self.container_topic = container_topic
+ self.ingest_file_request_topic = ingest_file_request_topic
self.poll_interval = poll_interval
self.consumer_group = "entity-updates"
+ self.ingest_oa_only = True
def run(self):
@@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker):
#print(cle)
print("processing changelog index {}".format(cle['index']))
release_ids = []
+ new_release_ids = []
file_ids = []
container_ids = []
work_ids = []
release_edits = cle['editgroup']['edits']['releases']
for re in release_edits:
release_ids.append(re['ident'])
+ # filter to direct release edits which are not updates
+ if not re.get('prev_revision') and not re.get('redirect_ident'):
+ new_release_ids.append(re['ident'])
file_edits = cle['editgroup']['edits']['files']
for e in file_edits:
file_ids.append(e['ident'])
@@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker):
key=ident.encode('utf-8'),
on_delivery=fail_fast,
)
+ # filter to "new" active releases with no matched files
+ if release.ident in new_release_ids:
+ ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only)
+ if ir and ir['ingest_type'] == 'file' and not release.files:
+ producer.produce(
+ self.ingest_file_request_topic,
+ json.dumps(ir).encode('utf-8'),
+ #key=None,
+ on_delivery=fail_fast,
+ )
producer.flush()
- # TODO: actually update works
+ # TODO: publish updated 'work' entities to a topic
consumer.store_offsets(message=msg)