diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2020-01-28 13:34:54 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2020-01-28 13:34:56 -0800 |
commit | 943409c2283faa9a6d04ccc6e43886224170e4f2 (patch) | |
tree | 8e4172e9b648be8707b6d5355f95615fc2cb24e0 /python | |
parent | fbc3cfc2594d90f8a39ee7f6ad2dfd323bcd76dd (diff) | |
download | fatcat-943409c2283faa9a6d04ccc6e43886224170e4f2.tar.gz fatcat-943409c2283faa9a6d04ccc6e43886224170e4f2.zip |
apply ingest request filtering in entity worker
`ingest_oa_only` behavior, and other filters, now handled in the entity
update worker, instead of in the transform function.
Also add a DOI prefix blocklist feature.
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 37 |
1 files changed, 34 insertions, 3 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 863ad40a..353eca8f 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -3,7 +3,8 @@ import json import time from confluent_kafka import Consumer, Producer, KafkaException -from fatcat_tools.transforms import release_ingest_request +from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch + from .worker_common import FatcatWorker, most_recent_message @@ -89,6 +90,36 @@ class EntityUpdatesWorker(FatcatWorker): self.poll_interval = poll_interval self.consumer_group = "entity-updates" self.ingest_oa_only = True + self.ingest_pdf_doi_prefix_blocklist = [ + # gbif.org: many DOIs, not PDF fulltext + "10.15468/", + ] + + def want_live_ingest(self, release, ingest_request): + """ + This function looks at ingest requests and decides whether they are + worth enqueing for ingest. + + In theory crawling all DOIs to a landing page is valuable. It is + intended to be an operational point of control to reduce load on daily + ingest crawling (via wayback SPN). + """ + + link_source = ingest_request.get('ingest_request') + ingest_type = ingest_request.get('ingest_type') + + if self.ingest_oa_only and link_source not in ('arxiv', 'pmc'): + es = release_to_elasticsearch(release) + if not es['is_oa']: + return False + + doi = ingest_request.get('ext_ids', {}).get('doi') + if ingest_type == "pdf" and doi: + for prefix in self.ingest_pdf_doi_prefix_blocklist: + if doi.startswith(prefix): + return False + + return True def run(self): @@ -222,8 +253,8 @@ class EntityUpdatesWorker(FatcatWorker): ) # filter to "new" active releases with no matched files if release.ident in new_release_ids: - ir = release_ingest_request(release, ingest_request_source='fatcat-changelog', oa_only=self.ingest_oa_only) - if ir and not release.files: + ir = release_ingest_request(release, ingest_request_source='fatcat-changelog') + if ir and not release.files and self.want_live_ingest(release, ir): producer.produce( self.ingest_file_request_topic, json.dumps(ir).encode('utf-8'), |