aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-11-12 21:14:32 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-11-15 16:46:26 -0800
commit9d2bde08e9f43942b76ca6d0f6c8845c33c02515 (patch)
tree38f54a2ebe210b97289a6ed6463ef07be8122a9b /python
parent06cf64414d4c7fa497c9ddb83b7c066b3779c4d2 (diff)
downloadfatcat-9d2bde08e9f43942b76ca6d0f6c8845c33c02515.tar.gz
fatcat-9d2bde08e9f43942b76ca6d0f6c8845c33c02515.zip
add ingest request feature to entity_updates worker
Initially was going to create a new worker to consume from the release update channel, but couldn't get the edit context ("is this a new release, or update to an existing") from that context. Currently there is a flag in source code to control whether we only do OA releases or all releases. Starting with OA only to start slow, but should probably default to all, and make this a config flag. Should probably also have a config flag to control this entire feature. Tested locally in dev.
Diffstat (limited to 'python')
-rw-r--r--python/fatcat_tools/workers/changelog.py24
-rwxr-xr-xpython/fatcat_worker.py2
2 files changed, 22 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 8b1ba5e9..fe5c55be 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -3,6 +3,7 @@ import json
import time
from confluent_kafka import Consumer, Producer, KafkaException
+from fatcat_tools.transforms import release_ingest_request
from .worker_common import FatcatWorker, most_recent_message
@@ -74,19 +75,20 @@ class EntityUpdatesWorker(FatcatWorker):
"""
Consumes from the changelog topic and publishes expanded entities (fetched
from API) to update topics.
-
- For now, only release updates are published.
"""
- def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0):
+ def __init__(self, api, kafka_hosts, consume_topic, release_topic,
+ file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
api=api)
self.release_topic = release_topic
self.file_topic = file_topic
self.container_topic = container_topic
+ self.ingest_file_request_topic = ingest_file_request_topic
self.poll_interval = poll_interval
self.consumer_group = "entity-updates"
+ self.ingest_oa_only = True
def run(self):
@@ -165,12 +167,16 @@ class EntityUpdatesWorker(FatcatWorker):
#print(cle)
print("processing changelog index {}".format(cle['index']))
release_ids = []
+ new_release_ids = []
file_ids = []
container_ids = []
work_ids = []
release_edits = cle['editgroup']['edits']['releases']
for re in release_edits:
release_ids.append(re['ident'])
+ # filter to direct release edits which are not updates
+ if not re.get('prev_revision') and not re.get('redirect_ident'):
+ new_release_ids.append(re['ident'])
file_edits = cle['editgroup']['edits']['files']
for e in file_edits:
file_ids.append(e['ident'])
@@ -214,7 +220,17 @@ class EntityUpdatesWorker(FatcatWorker):
key=ident.encode('utf-8'),
on_delivery=fail_fast,
)
+ # filter to "new" active releases with no matched files
+ if release.ident in new_release_ids:
+ ir = release_ingest_request(release, project='fatcat-changelog', oa_only=self.ingest_oa_only)
+ if ir and ir['ingest_type'] == 'file' and not release.files:
+ producer.produce(
+ self.ingest_file_request_topic,
+ json.dumps(ir).encode('utf-8'),
+ #key=None,
+ on_delivery=fail_fast,
+ )
producer.flush()
- # TODO: actually update works
+ # TODO: publish updated 'work' entities to a topic
consumer.store_offsets(message=msg)
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 628312be..c2120bae 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -23,11 +23,13 @@ def run_entity_updates(args):
release_topic = "fatcat-{}.release-updates-v03".format(args.env)
file_topic = "fatcat-{}.file-updates".format(args.env)
container_topic = "fatcat-{}.container-updates".format(args.env)
+ ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
worker = EntityUpdatesWorker(args.api, args.kafka_hosts,
changelog_topic,
release_topic=release_topic,
file_topic=file_topic,
container_topic=container_topic,
+ ingest_file_request_topic=ingest_file_request_topic,
)
worker.run()