From 943409c2283faa9a6d04ccc6e43886224170e4f2 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 28 Jan 2020 13:34:54 -0800 Subject: apply ingest request filtering in entity worker `ingest_oa_only` behavior, and other filters, now handled in the entity update worker, instead of in the transform function. Also add a DOI prefix blocklist feature. --- python/fatcat_tools/workers/changelog.py | 37 +++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 863ad40a..353eca8f 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,7 +3,8 @@ import json import time from confluent_kafka import Consumer, Producer, KafkaException -from fatcat_tools.transforms import release_ingest_request +from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch + from .worker_common import FatcatWorker, most_recent_message @@ -89,6 +90,36 @@ class EntityUpdatesWorker(FatcatWorker): self.poll_interval = poll_interval self.consumer_group = "entity-updates" self.ingest_oa_only = True + self.ingest_pdf_doi_prefix_blocklist = [ + # gbif.org: many DOIs, not PDF fulltext + "10.15468/", + ] + + def want_live_ingest(self, release, ingest_request): + """ + This function looks at ingest requests and decides whether they are + worth enqueing for ingest. + + In theory crawling all DOIs to a landing page is valuable. It is + intended to be an operational point of control to reduce load on daily + ingest crawling (via wayback SPN). + """ + + link_source = ingest_request.get('ingest_request') + ingest_type = ingest_request.get('ingest_type') + + if self.ingest_oa_only and link_source not in ('arxiv', 'pmc'): + es = release_to_elasticsearch(release) + if not es['is_oa']: + return False + + doi = ingest_request.get('ext_ids', {}).get('doi') + if ingest_type == "pdf" and doi: + for prefix in self.ingest_pdf_doi_prefix_blocklist: + if doi.startswith(prefix): + return False + + return True def run(self): @@ -222,8 +253,8 @@ class EntityUpdatesWorker(FatcatWorker): ) # filter to "new" active releases with no matched files if release.ident in new_release_ids: - ir = release_ingest_request(release, ingest_request_source='fatcat-changelog', oa_only=self.ingest_oa_only) - if ir and not release.files: + ir = release_ingest_request(release, ingest_request_source='fatcat-changelog') + if ir and not release.files and self.want_live_ingest(release, ir): producer.produce( self.ingest_file_request_topic, json.dumps(ir).encode('utf-8'), -- cgit v1.2.3