summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbnewbold <bnewbold@archive.org>2020-10-20 23:32:26 +0000
committerbnewbold <bnewbold@archive.org>2020-10-20 23:32:26 +0000
commitb6b30bf5468d01a878e3c2ecac933f29b5b6ddc5 (patch)
treeaede638925dabfac40bbd1c77607e4b31e3a234c
parenta7765fb8b6011c224dc1e4d7b0e30159bae7a903 (diff)
parent99c54764f0cbbb05409001b4af83182842f4e52d (diff)
downloadfatcat-b6b30bf5468d01a878e3c2ecac933f29b5b6ddc5.tar.gz
fatcat-b6b30bf5468d01a878e3c2ecac933f29b5b6ddc5.zip
Merge branch 'bnewbold-scholar-pipeline' into 'master'
entity updater: new work update feed (ident and changelog metadata only) See merge request webgroup/fatcat!87
-rw-r--r--python/fatcat_tools/workers/changelog.py26
-rwxr-xr-xpython/fatcat_worker.py2
2 files changed, 26 insertions, 2 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 13e46e02..d9f0cf91 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -79,7 +79,8 @@ class EntityUpdatesWorker(FatcatWorker):
"""
def __init__(self, api, kafka_hosts, consume_topic, release_topic,
- file_topic, container_topic, ingest_file_request_topic, poll_interval=5.0):
+ file_topic, container_topic, ingest_file_request_topic,
+ work_ident_topic, poll_interval=5.0):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
api=api)
@@ -87,6 +88,7 @@ class EntityUpdatesWorker(FatcatWorker):
self.file_topic = file_topic
self.container_topic = container_topic
self.ingest_file_request_topic = ingest_file_request_topic
+ self.work_ident_topic = work_ident_topic
self.poll_interval = poll_interval
self.consumer_group = "entity-updates"
self.ingest_oa_only = False
@@ -365,7 +367,8 @@ class EntityUpdatesWorker(FatcatWorker):
)
for ident in set(release_ids):
release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
- work_ids.append(release.work_id)
+ if release.work_id:
+ work_ids.append(release.work_id)
release_dict = self.api.api_client.sanitize_for_serialization(release)
producer.produce(
self.release_topic,
@@ -383,6 +386,25 @@ class EntityUpdatesWorker(FatcatWorker):
#key=None,
on_delivery=fail_fast,
)
+
+ # send work updates (just ident and changelog metadata) to scholar for re-indexing
+ for ident in set(work_ids):
+ assert ident
+ key = f"work_{ident}"
+ work_ident_dict = dict(
+ key=key,
+ type="fatcat_work",
+ work_ident=ident,
+ updated=cle['timestamp'],
+ fatcat_changelog_index=cle['index'],
+ )
+ producer.produce(
+ self.work_ident_topic,
+ json.dumps(work_ident_dict).encode('utf-8'),
+ key=key.encode('utf-8'),
+ on_delivery=fail_fast,
+ )
+
producer.flush()
# TODO: publish updated 'work' entities to a topic
consumer.store_offsets(message=msg)
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 19ac16cd..5e3fb3e9 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -22,12 +22,14 @@ def run_entity_updates(args):
release_topic = "fatcat-{}.release-updates-v03".format(args.env)
file_topic = "fatcat-{}.file-updates".format(args.env)
container_topic = "fatcat-{}.container-updates".format(args.env)
+ work_ident_topic = "fatcat-{}.work-ident-updates".format(args.env)
ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
worker = EntityUpdatesWorker(args.api, args.kafka_hosts,
changelog_topic,
release_topic=release_topic,
file_topic=file_topic,
container_topic=container_topic,
+ work_ident_topic=work_ident_topic,
ingest_file_request_topic=ingest_file_request_topic,
)
worker.run()