diff options
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/arabesque.py | 3 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 45 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 85 | ||||
-rw-r--r-- | python/fatcat_tools/importers/matched.py | 3 | ||||
-rw-r--r-- | python/fatcat_tools/transforms/ingest.py | 45 | ||||
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 2 |
7 files changed, 147 insertions, 38 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 025a111c..bb9c5b17 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -26,4 +26,4 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter diff --git a/python/fatcat_tools/importers/arabesque.py b/python/fatcat_tools/importers/arabesque.py index 7017c56c..acfc2b87 100644 --- a/python/fatcat_tools/importers/arabesque.py +++ b/python/fatcat_tools/importers/arabesque.py @@ -42,8 +42,7 @@ class ArabesqueMatchImporter(EntityImporter): def __init__(self, api, extid_type, require_grobid=True, **kwargs): - eg_desc = kwargs.get('editgroup_description', - "Match web crawl files to releases based on identifier/URL seedlist") + eg_desc = kwargs.get('editgroup_description', None) or "Match web crawl files to releases based on identifier/URL seedlist" eg_extra = kwargs.get('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ArabesqueMatchImporter') if kwargs.get('crawl_id'): diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 073725ad..d51a5ff4 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -6,6 +6,7 @@ import json import ftfy import base64 import sqlite3 +import datetime import subprocess import unicodedata from collections import Counter @@ -112,7 +113,7 @@ def clean(thing, force_xml=False): This function is appropriate to be called on any random, non-markup string, such as author names, titles, etc. - It will try to clean up commong unicode mangles, HTML characters, etc. + It will try to clean up common unicode mangles, HTML characters, etc. This will detect XML/HTML and "do the right thing" (aka, not remove entities like '&' if there are tags in the string), unless you pass the @@ -271,6 +272,11 @@ class EntityImporter: if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity + + Parameters: + + submit_mode: instead of accepting editgroups, only submits them. + implementors must write insert_batch appropriately """ def __init__(self, api, **kwargs): @@ -282,6 +288,7 @@ class EntityImporter: self.api = api self.bezerk_mode = kwargs.get('bezerk_mode', False) + self.submit_mode = kwargs.get('submit_mode', False) self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = eg_extra @@ -330,8 +337,20 @@ class EntityImporter: raise NotImplementedError def finish(self): + """ + Gets called as cleanup at the end of imports, but can also be called at + any time to "snip off" current editgroup progress. In other words, safe + to call this but then continue pushing records. + + For example, in a persistent worker could call this if there have been + no new entities fed in for more than some time period, to ensure that + entities actually get created within a reasonable time frame. + """ if self._edit_count > 0: - self.api.accept_editgroup(self._editgroup_id) + if self.submit_mode: + self.api.submit_editgroup(self._editgroup_id) + else: + self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] @@ -345,7 +364,10 @@ class EntityImporter: def get_editgroup_id(self, edits=1): if self._edit_count >= self.edit_batch_size: - self.api.accept_editgroup(self._editgroup_id) + if self.submit_mode: + self.api.submit_editgroup(self._editgroup_id) + else: + self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] @@ -715,22 +737,28 @@ class KafkaJsonPusher(RecordPusher): def run(self): count = 0 + last_push = datetime.datetime.now() while True: - # TODO: this is batch-oriented, because underlying importer is + # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed - # never created. Not great. + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: - # TODO: could have some larger timeout here and - # self.importer.finish() if it's been more than, eg, a couple - # minutes + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: @@ -743,6 +771,7 @@ class KafkaJsonPusher(RecordPusher): count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index deb4ef51..c47f0aa7 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -11,8 +11,7 @@ class IngestFileResultImporter(EntityImporter): def __init__(self, api, require_grobid=True, **kwargs): - eg_desc = kwargs.pop('editgroup_description', - "Files crawled from web using sandcrawler ingest tool") + eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled from web using sandcrawler ingest tool" eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter') super().__init__(api, @@ -21,7 +20,6 @@ class IngestFileResultImporter(EntityImporter): **kwargs) self.default_link_rel = kwargs.get("default_link_rel", "web") assert self.default_link_rel - self.default_mimetype = kwargs.get("default_mimetype", None) self.do_updates = kwargs.get("do_updates", False) self.require_grobid = require_grobid if self.require_grobid: @@ -53,9 +51,14 @@ class IngestFileResultImporter(EntityImporter): if row.get('hit') != True: self.counts['skip-hit'] += 1 return False - if self.ingest_request_source_whitelist and row['request'].get('ingest_request_source') not in self.ingest_request_source_whitelist: + source = row['request'].get('ingest_request_source') + if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist: self.counts['skip-ingest_request_source'] += 1 return False + if source.startswith('savepapernow'): + # never process async savepapernow requests + self.counts['skip-savepapernow'] += 1 + return False if not row.get('file_meta'): self.counts['skip-file-meta'] += 1 return False @@ -123,16 +126,21 @@ class IngestFileResultImporter(EntityImporter): sha1=file_meta['sha1hex'], sha256=file_meta['sha256hex'], size=file_meta['size_bytes'], - mimetype=file_meta['mimetype'] or self.default_mimetype, + mimetype=file_meta['mimetype'], release_ids=[release_ident], urls=urls, ) if fatcat and fatcat.get('edit_extra'): fe.edit_extra = fatcat['edit_extra'] + else: + fe.edit_extra = dict() if request.get('ingest_request_source'): - if not fe.edit_extra: - fe.edit_extra = dict() fe.edit_extra['ingest_request_source'] = request['ingest_request_source'] + if request.get('link_source') and request.get('link_source_id'): + fe.edit_extra['link_source'] = request['link_source'] + fe.edit_extra['link_source_id'] = request['link_source_id'] + if not fe.edit_extra: + fe.edit_extra = None return fe def try_update(self, fe): @@ -152,6 +160,12 @@ class IngestFileResultImporter(EntityImporter): self.counts['exists'] += 1 return False + # check for existing edits-in-progress with same file hash + for other in self._entity_queue: + if other.sha1 == fe.sha1: + self.counts['skip-in-queue'] += 1 + return False + if not self.do_updates: self.counts['skip-update-disabled'] += 1 return False @@ -167,3 +181,60 @@ class IngestFileResultImporter(EntityImporter): extra=self.editgroup_extra), entity_list=batch)) + +class SavePaperNowFileImporter(IngestFileResultImporter): + """ + This worker ingests from the same feed as IngestFileResultImporter, but + only imports files from anonymous save-paper-now requests, and "submits" + them for further human review (as opposed to accepting by default). + """ + + def __init__(self, api, submit_mode=True, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') + kwargs['submit_mode'] = submit_mode + kwargs['require_grobid'] = True + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + + source = row['request'].get('ingest_request_source') + if not source.startswith('savepapernow'): + self.counts['skip-not-savepapernow'] += 1 + return False + if row.get('hit') != True: + self.counts['skip-hit'] += 1 + return False + if not row.get('file_meta'): + self.counts['skip-file-meta'] += 1 + return False + if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: + self.counts['skip-grobid'] += 1 + return False + + return True + + def insert_batch(self, batch): + """ + Usually running in submit_mode, so we can't use auto_batch method + """ + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_file(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py index dbb78ec9..180d7ba3 100644 --- a/python/fatcat_tools/importers/matched.py +++ b/python/fatcat_tools/importers/matched.py @@ -35,8 +35,7 @@ class MatchedImporter(EntityImporter): def __init__(self, api, **kwargs): - eg_desc = kwargs.pop('editgroup_description', - "Import of large-scale file-to-release match results. Source of metadata varies.") + eg_desc = kwargs.pop('editgroup_description', None) or "Import of large-scale file-to-release match results. Source of metadata varies." eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.MatchedImporter') super().__init__(api, diff --git a/python/fatcat_tools/transforms/ingest.py b/python/fatcat_tools/transforms/ingest.py index c2ae6e0f..e08d56b8 100644 --- a/python/fatcat_tools/transforms/ingest.py +++ b/python/fatcat_tools/transforms/ingest.py @@ -1,7 +1,7 @@ from .elasticsearch import release_to_elasticsearch -def release_ingest_request(release, oa_only=False, ingest_request_source='fatcat'): +def release_ingest_request(release, oa_only=False, ingest_request_source='fatcat', ingest_type=None): """ Takes a full release entity object and returns an ingest request (as dict), or None if it seems like this release shouldn't be ingested. @@ -12,7 +12,7 @@ def release_ingest_request(release, oa_only=False, ingest_request_source='fatcat The 'oa_only' boolean flag indicates that we should only return an ingest request if we have reason to believe this is an OA release (or, eg, in arxiv or pubmed central). Respecting this flag means we are likely to miss - a lot of "hybrid" and "bronze" content, but could reduce load + a lot of "hybrid" and "bronze" content, but could reduce crawl load significantly. The type of the ingest request may depend on release type and container @@ -25,42 +25,53 @@ def release_ingest_request(release, oa_only=False, ingest_request_source='fatcat # generate a URL where we expect to find fulltext url = None - expect_mimetypes = [] + link_source = None + link_source_id = None if release.ext_ids.arxiv: url = "https://arxiv.org/pdf/{}.pdf".format(release.ext_ids.arxiv) - expect_mimetypes = ['application/pdf'] - elif release.ext_ids.pmcid: - #url = "https://www.ncbi.nlm.nih.gov/pmc/articles/{}/pdf/".format(release.ext_ids.pmcid) - url = "http://europepmc.org/backend/ptpmcrender.fcgi?accid={}&blobtype=pdf".format(release.ext_ids.pmcid) - expect_mimetypes = ['application/pdf'] + link_source = "arxiv" + link_source_id = release.ext_ids.arxiv elif release.ext_ids.doi: url = "https://doi.org/{}".format(release.ext_ids.doi) + link_source = "doi" + link_source_id = release.ext_ids.doi + elif release.ext_ids.pmcid and release.ext_ids.pmid: + # TODO: how to tell if an author manuscript in PMC vs. published? + #url = "https://www.ncbi.nlm.nih.gov/pmc/articles/{}/pdf/".format(release.ext_ids.pmcid) + url = "http://europepmc.org/backend/ptpmcrender.fcgi?accid={}&blobtype=pdf".format(release.ext_ids.pmcid) + link_source = "pubmed" + link_source_id = release.ext_ids.pmid if not url: return None - ext_ids = dict() - for k in ('doi', 'pmid', 'pmcid', 'arxiv'): - v = getattr(release.ext_ids, k) - if v: - ext_ids[k] = v + ext_ids = release.ext_ids.to_dict() + ext_ids = dict([(k, v) for (k, v) in ext_ids.items() if v]) - if oa_only and not ext_ids.get('arxiv') and not ext_ids.get('pmcid'): + if oa_only and link_source not in ('arxiv', 'pubmed'): es = release_to_elasticsearch(release) if not es['is_oa']: return None + # TODO: infer ingest type based on release_type or container metadata? + if not ingest_type: + ingest_type = 'pdf' + ingest_request = { - 'ingest_type': 'file', + 'ingest_type': ingest_type, 'ingest_request_source': ingest_request_source, 'base_url': url, + 'release_stage': release.release_stage, 'fatcat': { - 'release_stage': release.release_stage, 'release_ident': release.ident, 'work_ident': release.work_id, }, 'ext_ids': ext_ids, - 'expect_mimetypes': expect_mimetypes or None, } + + if link_source and link_source_id: + ingest_request['link_source'] = link_source + ingest_request['link_source_id'] = link_source_id + return ingest_request diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index e1a72217..863ad40a 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -223,7 +223,7 @@ class EntityUpdatesWorker(FatcatWorker): # filter to "new" active releases with no matched files if release.ident in new_release_ids: ir = release_ingest_request(release, ingest_request_source='fatcat-changelog', oa_only=self.ingest_oa_only) - if ir and ir['ingest_type'] == 'file' and not release.files: + if ir and not release.files: producer.produce( self.ingest_file_request_topic, json.dumps(ir).encode('utf-8'), |