diff options
Diffstat (limited to 'python/fatcat_tools/importers')
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/ingest.py | 324 | 
2 files changed, 259 insertions, 67 deletions
| diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index b82eb11a..c08e04c2 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,6 +27,6 @@ from .orcid import OrcidImporter  from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE  from .wayback_static import auto_wayback_static  from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter  from .shadow import ShadowLibraryImporter  from .file_meta import FileMetaImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index b6851eec..2042d331 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -38,39 +38,11 @@ class IngestFileResultImporter(EntityImporter):          if kwargs.get('skip_source_allowlist', False):              self.ingest_request_source_allowlist = [] -    def want(self, row): +    def want_file(self, row) -> bool:          """ -        Logic here probably needs work (TODO): - -        - Direct ingests via DOI from fatcat-changelog should probably go -          through regardless of GROBID status -        - We should filter/block things like single-page PDFs here -        - public/anonymous submissions could require successful biblio-glutton -          match, or some other sanity check on the fatcat side (eg, fuzzy title -          match) -        - handle the case of release_stage not being 'published'; if pre-print, -          potentially create a new release. - -        The current logic is intentionally conservative as a first step. +        File-specific part of want(). Generic across general ingest and save-paper-now.          """ -        if row.get('hit') != True: -            self.counts['skip-hit'] += 1 -            return False -        source = row['request'].get('ingest_request_source') -        if not source: -            self.counts['skip-ingest_request_source'] += 1 -            return False -        if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist: -            self.counts['skip-ingest_request_source'] += 1 -            return False -        if source.startswith('arabesque'): -            if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'): -                self.counts['skip-arabesque-source'] += 1 -                return False -        if source.startswith('savepapernow'): -            # never process async savepapernow requests -            self.counts['skip-savepapernow'] += 1 -            return False +          if not row.get('file_meta'):              self.counts['skip-file-meta'] += 1              return False @@ -94,25 +66,60 @@ class IngestFileResultImporter(EntityImporter):          return True -    def parse_record(self, row): +    def want_ingest(self, row) -> bool: +        """ +        Sandcrawler ingest-specific part of want(). Generic across file and +        webcapture ingest. +        """ +        if row.get('hit') != True: +            self.counts['skip-hit'] += 1 +            return False +        source = row['request'].get('ingest_request_source') +        if not source: +            self.counts['skip-ingest_request_source'] += 1 +            return False +        if self.ingest_request_source_allowlist and source not in self.ingest_request_source_allowlist: +            self.counts['skip-ingest_request_source'] += 1 +            return False + +        if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'): +            self.counts['skip-link-source'] += 1 +            return False + +        if source.startswith('savepapernow'): +            # never process async savepapernow requests +            self.counts['skip-savepapernow'] += 1 +            return False + +        return True + +    def want(self, row): +        """ +        Overall logic here probably needs work (TODO): + +        - Direct ingests via DOI from fatcat-changelog should probably go +          through regardless of GROBID status +        - We should filter/block things like single-page PDFs here +        - public/anonymous submissions could require successful biblio-glutton +          match, or some other sanity check on the fatcat side (eg, fuzzy title +          match) +        - handle the case of release_stage not being 'published'; if pre-print, +          potentially create a new release. + +        The current logic is intentionally conservative as a first step. +        """ +        if not self.want_file(row): +            return False +        if not self.want_ingest(row): +            return False + +        return True + +    def parse_ingest_release_ident(self, row):          request = row['request']          fatcat = request.get('fatcat') -        file_meta = row['file_meta'] -     -        # double check that want() filtered request correctly (eg, old requests) -        if request.get('ingest_type') not in ('pdf', 'xml'): -            self.counts['skip-ingest-type'] += 1 -            return None -        assert (request['ingest_type'], file_meta['mimetype']) in [ -            ("pdf", "application/pdf"), -            ("xml", "application/xml"), -            ("xml", "application/jats+xml"), -            ("xml", "application/tei+xml"), -            ("xml", "text/xml"), -        ] -        # identify release by fatcat ident, or extid lookup, or biblio-glutton match          release_ident = None          if fatcat and fatcat.get('release_ident'):              release_ident = fatcat.get('release_ident') @@ -138,16 +145,16 @@ class IngestFileResultImporter(EntityImporter):                          return None                  release_ident = release.ident                  break +          if self.use_glutton_match and not release_ident and row.get('grobid'):              # try biblio-glutton extracted hit              if row['grobid'].get('fatcat_release'):                  release_ident = row['grobid']['fatcat_release'].split('_')[-1]                  self.counts['glutton-match'] += 1 -        if not release_ident: -            self.counts['skip-release-not-found'] += 1 -            return None +        return release_ident +    def parse_terminal(self, row):          terminal = row.get('terminal')          if not terminal:              # support old cdx-only ingest results @@ -170,6 +177,10 @@ class IngestFileResultImporter(EntityImporter):              terminal['terminal_dt'] = terminal['dt']          assert len(terminal['terminal_dt']) == 14 +    def parse_urls(self, row, terminal): + +        request = row['request'] +          default_rel = self.default_link_rel          if request.get('link_source') == 'doi':              default_rel = 'publisher' @@ -185,6 +196,51 @@ class IngestFileResultImporter(EntityImporter):          urls = [url, ("webarchive", wayback)]          urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls] +        return urls + +    def parse_edit_extra(self, row): + +        request = row['request'] +        edit_extra = dict() + +        if request.get('edit_extra'): +            edit_extra = request['edit_extra'] + +        if request.get('ingest_request_source'): +            edit_extra['ingest_request_source'] = request['ingest_request_source'] +        if request.get('link_source') and request.get('link_source_id'): +            edit_extra['link_source'] = request['link_source'] +            edit_extra['link_source_id'] = request['link_source_id'] + +        return edit_extra + +    def parse_record(self, row): + +        request = row['request'] +        fatcat = request.get('fatcat') +        file_meta = row['file_meta'] + +        # double check that want() filtered request correctly (eg, old requests) +        if request.get('ingest_type') not in ('pdf', 'xml'): +            self.counts['skip-ingest-type'] += 1 +            return None +        assert (request['ingest_type'], file_meta['mimetype']) in [ +            ("pdf", "application/pdf"), +            ("xml", "application/xml"), +            ("xml", "application/jats+xml"), +            ("xml", "application/tei+xml"), +            ("xml", "text/xml"), +        ] + +        # identify release by fatcat ident, or extid lookup, or biblio-glutton match +        release_ident = self.parse_ingest_release_ident(row) + +        if not release_ident: +            self.counts['skip-release-not-found'] += 1 +            return None + +        terminal = self.parse_terminal(row) +        urls = self.parse_urls(row, terminal)          fe = fatcat_openapi_client.FileEntity(              md5=file_meta['md5hex'], @@ -195,17 +251,10 @@ class IngestFileResultImporter(EntityImporter):              release_ids=[release_ident],              urls=urls,          ) -        if request.get('edit_extra'): -            fe.edit_extra = request['edit_extra'] -        else: -            fe.edit_extra = dict() -        if request.get('ingest_request_source'): -            fe.edit_extra['ingest_request_source'] = request['ingest_request_source'] -        if request.get('link_source') and request.get('link_source_id'): -            fe.edit_extra['link_source'] = request['link_source'] -            fe.edit_extra['link_source_id'] = request['link_source_id'] -        if not fe.edit_extra: -            fe.edit_extra = None + +        edit_extra = self.parse_edit_extra(row) +        if edit_extra: +            fe.edit_extra = edit_extra          return fe      def try_update(self, fe): @@ -270,6 +319,9 @@ class SavePaperNowFileImporter(IngestFileResultImporter):      def want(self, row): +        if not self.want_file(row): +            return False +          source = row['request'].get('ingest_request_source')          if not source:              self.counts['skip-ingest_request_source'] += 1 @@ -280,12 +332,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter):          if row.get('hit') != True:              self.counts['skip-hit'] += 1              return False -        if not row.get('file_meta'): -            self.counts['skip-file-meta'] += 1 -            return False -        if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: -            self.counts['skip-grobid'] += 1 -            return False          return True @@ -306,3 +352,149 @@ class SavePaperNowFileImporter(IngestFileResultImporter):                      description=self.editgroup_description,                      extra=self.editgroup_extra),                  entity_list=batch)) + +class IngestWebResultImporter(IngestFileResultImporter): +    """ +    Variant of IngestFileResultImporter for processing HTML ingest requests +    into webcapture objects. +    """ + +    def __init__(self, api, **kwargs): + +        eg_desc = kwargs.pop('editgroup_description', None) or "WebCaptures crawled from web using sandcrawler ingest tool" +        eg_extra = kwargs.pop('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebResultImporter') +        kwargs['do_updates'] = False +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) + +    def want(self, row): + +        if not self.want_ingest(row): +            return False + +        if not row.get('file_meta'): +            self.counts['skip-file-meta'] += 1 +            return False + +        # webcapture-specific filters +        if row['request'].get('ingest_type') != 'html': +            self.counts['skip-ingest-type'] += 1 +            return False +        if row['file_meta'].get('mimetype') not in ("text/html", "application/html"): +            self.counts['skip-mimetype'] += 1 +            return False + +        return True + + +    def parse_record(self, row): +        """ +        TODO: more of this parsing could be DRY with the file version +        """ + +        request = row['request'] +        file_meta = row['file_meta'] + +        # double check that want() filtered request correctly (eg, old requests) +        if request.get('ingest_type') != "html": +            self.counts['skip-ingest-type'] += 1 +            return None +        if file_meta['mimetype'] not in ("text/html", "application/html"): +            self.counts['skip-mimetype'] += 1 +            return None + +        # identify release by fatcat ident, or extid lookup +        release_ident = self.parse_ingest_release_ident(row) + +        if not release_ident: +            self.counts['skip-release-not-found'] += 1 +            return None + +        terminal = self.parse_terminal(row) +        urls = self.parse_urls(row, terminal) +        archive_urls = [u for u in urls if u['rel'] == 'webarchive'] + +        if terminal['terminal_status_code'] != 200: +            self.counts['skip-terminal-status-code'] += 1 +            return None + +        terminal_cdx = row['cdx'] +        if 'revisit_cdx' in row: +            terminal_cdx = row['revisit_cdx'] +        assert terminal_cdx['surt'] +        assert terminal_cdx['url'] == terminal['terminal_url'] + +        wc_cdx = [] +        # primary resource first +        wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine( +            surt=terminal['terminal_surt'], # XXX: from CDX? +            timestamp=terminal['terminal_dt'], # as an ISO datetime +            url=terminal['terminal_url'], +            mimetype=file_meta['mimetype'], +            status_code=terminal['terminal_status_code'], +            sha1=file_meta['sha1hex'], +            sha256=file_meta['sha256hex'], +            size=file_meta['size_bytes'], +        )) + +        for resource in row.get('html_resources', []): +            wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine( +                surt=resource['surt'], +                timestamp=resource['timestamp'], +                url=resource['url'], +                mimetype=resource.get('mimetype'), +                size=resource.get('size_bytes'), +                sha1=resource.get('sha1hex'), +                sha256=resource.get('sha256hex'), +            )) + +        wc = fatcat_openapi_client.WebCaptureEntity( +            cdx=wc_cdx, +            archive_urls=archive_urls, +            original_url=terminal['terminal_url'], +            timestamp=terminal['terminal_dt'], +            release_ids=[release_ident], +            urls=urls, +        ) + +        edit_extra = self.parse_edit_extra(row) + +        if edit_extra: +            wc.edit_extra = edit_extra +        return wc + + +    def try_update(self, wc): + +        # check for existing edits-in-progress with same file hash +        for other in self._entity_queue: +            if other.sha1 == wc.sha1: +                self.counts['skip-in-queue'] += 1 +                return False + +        # lookup sha1, or create new entity +        existing = None +        # XXX: lookup *release* instead; skip if any existing web capture entities +        # XXX: only one release per webcapture +        try: +            existing = self.api.lookup_file(sha1=wc.sha1) +        except fatcat_openapi_client.rest.ApiException as err: +            if err.status != 404: +                raise err + +        if not existing: +            return True +        else: +            # TODO: for now, never update +            self.counts['skip-update-disabled'] += 1 +            return False + +    def insert_batch(self, batch): +        self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebCaptureAutoBatch( +            editgroup=fatcat_openapi_client.Editgroup( +                description=self.editgroup_description, +                extra=self.editgroup_extra), +            entity_list=batch)) | 
