diff options
Diffstat (limited to 'python/fatcat_tools/importers')
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 3 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 158 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/crossref.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/datacite.py | 10 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/doaj_article.py | 358 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/ingest.py | 329 | 
6 files changed, 648 insertions, 212 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index b82eb11a..d2928d09 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,6 +27,7 @@ from .orcid import OrcidImporter  from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE  from .wayback_static import auto_wayback_static  from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter  from .shadow import ShadowLibraryImporter  from .file_meta import FileMetaImporter +from .doaj_article import DoajArticleImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 14415683..3c810391 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -3,12 +3,9 @@ import re  import sys  import csv  import json -import ftfy -import base64  import sqlite3  import datetime  import subprocess -import unicodedata  from collections import Counter  from confluent_kafka import Consumer, KafkaException  import xml.etree.ElementTree as ET @@ -18,162 +15,13 @@ from bs4 import BeautifulSoup  import fatcat_openapi_client  from fatcat_openapi_client.rest import ApiException +# TODO: refactor so remove need for this (re-imports for backwards compatibility) +from fatcat_tools.normal import (clean_str as clean, is_cjk, b32_hex, LANG_MAP_MARC) # noqa: F401  DATE_FMT = "%Y-%m-%d"  SANE_MAX_RELEASES = 200  SANE_MAX_URLS = 100 -# These are very close, but maybe not exactly 1-to-1 with 639-2? Some mix of -# 2/T and 2/B? -# PubMed/MEDLINE and JSTOR use these MARC codes -# https://www.loc.gov/marc/languages/language_name.html -LANG_MAP_MARC = { -    'afr': 'af', -    'alb': 'sq', -    'amh': 'am', -    'ara': 'ar', -    'arm': 'hy', -    'aze': 'az', -    'ben': 'bn', -    'bos': 'bs', -    'bul': 'bg', -    'cat': 'ca', -    'chi': 'zh', -    'cze': 'cs', -    'dan': 'da', -    'dut': 'nl', -    'eng': 'en', -    'epo': 'eo', -    'est': 'et', -    'fin': 'fi', -    'fre': 'fr', -    'geo': 'ka', -    'ger': 'de', -    'gla': 'gd', -    'gre': 'el', -    'heb': 'he', -    'hin': 'hi', -    'hrv': 'hr', -    'hun': 'hu', -    'ice': 'is', -    'ind': 'id', -    'ita': 'it', -    'jpn': 'ja', -    'kin': 'rw', -    'kor': 'ko', -    'lat': 'la', -    'lav': 'lv', -    'lit': 'lt', -    'mac': 'mk', -    'mal': 'ml', -    'mao': 'mi', -    'may': 'ms', -    'nor': 'no', -    'per': 'fa', -    'per': 'fa', -    'pol': 'pl', -    'por': 'pt', -    'pus': 'ps', -    'rum': 'ro', -    'rus': 'ru', -    'san': 'sa', -    'slo': 'sk', -    'slv': 'sl', -    'spa': 'es', -    'srp': 'sr', -    'swe': 'sv', -    'tha': 'th', -    'tur': 'tr', -    'ukr': 'uk', -    'urd': 'ur', -    'vie': 'vi', -    'wel': 'cy', - -# additions -    'gle': 'ga', # "Irish" (Gaelic) -    'jav': 'jv', # Javanese -    'welsh': 'cy', # Welsh -    'oci': 'oc', # Occitan - -# Don't have ISO 639-1 codes -    'grc': 'el', # Ancient Greek; map to modern greek -    'map': None, # Austronesian (collection) -    'syr': None, # Syriac, Modern -    'gem': None, # Old Saxon -    'non': None, # Old Norse -    'emg': None, # Eastern Meohang -    'neg': None, # Negidal -    'mul': None, # Multiple languages -    'und': None, # Undetermined -} - - -def clean(thing, force_xml=False): -    """ -    This function is appropriate to be called on any random, non-markup string, -    such as author names, titles, etc. - -    It will try to clean up common unicode mangles, HTML characters, etc. - -    This will detect XML/HTML and "do the right thing" (aka, not remove -    entities like '&' if there are tags in the string), unless you pass the -    'force_xml' parameter, which might be appropriate for, eg, names and -    titles, which generally should be projected down to plain text. - -    Also strips extra whitespace. -    """ -    if not thing: -        return None -    fix_entities = 'auto' -    if force_xml: -        fix_entities = True -    fixed = ftfy.fix_text(thing, fix_entities=fix_entities).strip() -    if not fixed or len(fixed) <= 1: -        # wasn't zero-length before, but is now; return None -        return None -    return fixed - -def test_clean(): - -    assert clean(None) == None -    assert clean('') == None -    assert clean('1') == None -    assert clean('123') == '123' -    assert clean('a&b') == 'a&b' -    assert clean('<b>a&b</b>') == '<b>a&b</b>' -    assert clean('<b>a&b</b>', force_xml=True) == '<b>a&b</b>' - -def b32_hex(s): -    s = s.strip().split()[0].lower() -    if s.startswith("sha1:"): -        s = s[5:] -    if len(s) != 32: -        return s -    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - -def is_cjk(s): -    if not s: -        return False -    for c in s: -        if c.isalpha(): -            lang_prefix = unicodedata.name(c).split()[0] -            return lang_prefix in ('CJK', 'HIRAGANA', 'KATAKANA', 'HANGUL') -    return False - -def test_is_cjk(): -    assert is_cjk(None) is False -    assert is_cjk('') is False -    assert is_cjk('blah') is False -    assert is_cjk('岡, 鹿, 梨, 阜, 埼') is True -    assert is_cjk('[岡, 鹿, 梨, 阜, 埼]') is True -    assert is_cjk('菊') is True -    assert is_cjk('岡, 鹿, 梨, 阜, 埼 with eng after') is True -    assert is_cjk('水道') is True -    assert is_cjk('オウ, イク') is True # kanji -    assert is_cjk('ひヒ') is True -    assert is_cjk('き゚ゅ') is True -    assert is_cjk('ㄴ, ㄹ, ㅁ, ㅂ, ㅅ') is True -  DOMAIN_REL_MAP = {      "archive.org": "archive",      # LOCKSS, Portico, DuraSpace, etc would also be "archive" @@ -444,6 +292,7 @@ class EntityImporter:          raise NotImplementedError      def is_orcid(self, orcid): +        # TODO: replace with clean_orcid() from fatcat_tools.normal          return self._orcid_regex.match(orcid) is not None      def lookup_orcid(self, orcid): @@ -464,6 +313,7 @@ class EntityImporter:          return creator_id      def is_doi(self, doi): +        # TODO: replace with clean_doi() from fatcat_tools.normal          return doi.startswith("10.") and doi.count("/") >= 1      def lookup_doi(self, doi): diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 71f08952..e77fa65e 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -487,8 +487,6 @@ class CrossrefImporter(EntityImporter):          except fatcat_openapi_client.rest.ApiException as err:              if err.status != 404:                  raise err -            # doesn't exist, need to update -            return True          # eventually we'll want to support "updates", but for now just skip if          # entity already exists diff --git a/python/fatcat_tools/importers/datacite.py b/python/fatcat_tools/importers/datacite.py index 86740e80..70f8db86 100644 --- a/python/fatcat_tools/importers/datacite.py +++ b/python/fatcat_tools/importers/datacite.py @@ -151,7 +151,7 @@ UNKNOWN_MARKERS = set(DATACITE_UNKNOWN_MARKERS).union(set((      'Unknown',  ))) -# UNKNOWN_MARKERS_LOWER are lowercase version of UNKNOWN blacklist. +# UNKNOWN_MARKERS_LOWER are lowercase version of UNKNOWN blocklist.  UNKNOWN_MARKERS_LOWER = set((v.lower() for v in UNKNOWN_MARKERS))  # Any "min" number of "tokens" will signal "spam", https://fatcat.wiki/release/rzcpjwukobd4pj36ipla22cnoi @@ -346,7 +346,7 @@ class DataciteImporter(EntityImporter):              print('[{}] skipping record w/o title: {}'.format(doi, obj), file=sys.stderr)              return False -        # check for blacklisted "spam", e.g. "FULL MOVIE" +        # check for blocklisted "spam", e.g. "FULL MOVIE"          for rule in DATACITE_TITLE_SPAM_WORDGROUPS:              seen = set()              for token in rule.get("tokens", []): @@ -781,8 +781,6 @@ class DataciteImporter(EntityImporter):          except fatcat_openapi_client.rest.ApiException as err:              if err.status != 404:                  raise err -            # doesn't exist, need to update -            return True          # eventually we'll want to support "updates", but for now just skip if          # entity already exists @@ -819,7 +817,7 @@ class DataciteImporter(EntityImporter):          contribs = []          # Names, that should be ignored right away. -        name_blacklist = set(('Occdownload Gbif.Org',)) +        name_blocklist = set(('Occdownload Gbif.Org',))          i = 0          for c in creators: @@ -861,7 +859,7 @@ class DataciteImporter(EntityImporter):                      continue                  if not name:                      name = "{} {}".format(given_name or '', surname or '').strip() -                if name in name_blacklist: +                if name in name_blocklist:                      continue                  if name.lower() in UNKNOWN_MARKERS_LOWER:                      continue diff --git a/python/fatcat_tools/importers/doaj_article.py b/python/fatcat_tools/importers/doaj_article.py new file mode 100644 index 00000000..03752484 --- /dev/null +++ b/python/fatcat_tools/importers/doaj_article.py @@ -0,0 +1,358 @@ +""" +Importer for DOAJ article-level metadata, schema v1. + +DOAJ API schema and docs: https://doaj.org/api/v1/docs +""" + +import warnings +import datetime +from typing import List, Optional + +import fatcat_openapi_client +from fatcat_tools.normal import (clean_doi, clean_str, parse_month, +    clean_orcid, detect_text_lang, parse_lang_name, parse_country_name, +    clean_pmid, clean_pmcid) +from fatcat_tools.importers.common import EntityImporter + +# Cutoff length for abstracts. +MAX_ABSTRACT_LENGTH = 2048 + + +class DoajArticleImporter(EntityImporter): + +    def __init__(self, +                 api, +                 issn_map_file, +                 **kwargs): + +        eg_desc = kwargs.get( +            'editgroup_description', +            "Automated import of DOAJ article metadata, harvested from REST API or bulk dumps" +        ) +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', +                                         'fatcat_tools.DoajArticleImporter') +        # ensure default is to not do updates with this worker (override super() default) +        kwargs['do_updates'] = kwargs.get("do_updates", False) +        super().__init__(api, +                         issn_map_file=issn_map_file, +                         editgroup_description=eg_desc, +                         editgroup_extra=eg_extra, +                         **kwargs) + +        self.this_year = datetime.datetime.now().year +        self.read_issn_map_file(issn_map_file) + +    def want(self, obj): +        return True + +    def parse_record(self, obj): +        """ +        bibjson { +            abstract (string, optional), +            author (Array[bibjson.author], optional), +            identifier (Array[bibjson.identifier]), +            journal (bibjson.journal, optional), +            keywords (Array[string], optional), +            link (Array[bibjson.link], optional), +            month (string, optional), +            subject (Array[bibjson.subject], optional), +            title (string), +            year (string, optional) +        } +        bibjson.journal { +            country (string, optional), +            end_page (string, optional), +            language (Array[string], optional), +            license (Array[bibjson.journal.license], optional), +            number (string, optional), +            publisher (string, optional), +            start_page (string, optional), +            title (string, optional), +            volume (string, optional) +        } +        """ + +        if not obj or not isinstance(obj, dict) or not 'bibjson' in obj: +            self.counts['skip-empty'] += 1 +            return None + +        bibjson = obj['bibjson'] + +        title = clean_str(bibjson.get('title'), force_xml=True) +        if not title: +            self.counts['skip-title'] += 1 +            return False + +        container_name = clean_str(bibjson['journal']['title']) +        container_id = None +        # NOTE: 'issns' not documented in API schema +        for issn in bibjson['journal']['issns']: +            issnl = self.issn2issnl(issn) +            if issnl: +                container_id = self.lookup_issnl(self.issn2issnl(issn)) +            if container_id: +                # don't store container_name when we have an exact match +                container_name = None +                break + +        volume = clean_str(bibjson['journal'].get('volume')) +        # NOTE: this schema seems to use "number" as "issue number" +        issue = clean_str(bibjson['journal'].get('number')) +        publisher = clean_str(bibjson['journal'].get('publisher')) + +        try: +            release_year = int(bibjson.get('year')) +        except (TypeError, ValueError): +            release_year = None +        release_month = parse_month(clean_str(bibjson.get('month'))) + +        # block bogus far-future years/dates +        if release_year is not None and (release_year > (self.this_year + 5) or release_year < 1000): +            release_month = None +            release_year = None + +        license_slug = self.doaj_license_slug(bibjson['journal'].get('license')) +        country = parse_country_name(bibjson['journal'].get('country')) +        language = None +        for raw in bibjson['journal'].get('language') or []: +            language = parse_lang_name(raw) +            if language: +                break + +        # pages +        # NOTE: error in API docs? seems like start_page not under 'journal' object +        start_page = clean_str(bibjson['journal'].get('start_page')) or clean_str(bibjson.get('start_page')) +        end_page = clean_str(bibjson['journal'].get('end_page')) or clean_str(bibjson.get('end_page')) +        pages: Optional[str] = None +        if start_page and end_page: +            pages = f"{start_page}-{end_page}" +        elif start_page: +            pages = start_page + +        doaj_article_id = obj['id'].lower() +        ext_ids = self.doaj_ext_ids(bibjson['identifier'], doaj_article_id) +        abstracts = self.doaj_abstracts(bibjson) +        contribs = self.doaj_contribs(bibjson.get('author') or []) + +        # DOAJ-specific extra +        doaj_extra = dict() +        if bibjson.get('subject'): +            doaj_extra['subject'] = bibjson.get('subject') +        if bibjson.get('keywords'): +            doaj_extra['keywords'] = [k for k in [clean_str(s) for s in bibjson.get('keywords')] if k] + +        # generic extra +        extra = dict() +        if country: +            extra['country'] = country +        if not container_id and container_name: +            extra['container_name'] = container_name +        if release_year and release_month: +            # TODO: schema migration +            extra['release_month'] = release_month + +        if doaj_extra: +            extra['doaj'] = doaj_extra +        if not extra: +            extra = None + +        re = fatcat_openapi_client.ReleaseEntity( +            work_id=None, +            container_id=container_id, +            release_type='article-journal', +            release_stage='published', +            title=title, +            release_year=release_year, +            #release_date, +            publisher=publisher, +            ext_ids=ext_ids, +            contribs=contribs, +            volume=volume, +            issue=issue, +            pages=pages, +            language=language, +            abstracts=abstracts, +            extra=extra, +            license_slug=license_slug, +        ) +        re = self.biblio_hacks(re) +        return re + +    @staticmethod +    def biblio_hacks(re): +        """ +        This function handles known special cases. For example, +        publisher-specific or platform-specific workarounds. +        """ +        return re + +    def try_update(self, re): + +        # lookup existing release by DOAJ article id +        existing = None +        try: +            existing = self.api.lookup_release(doaj=re.ext_ids.doaj) +        except fatcat_openapi_client.rest.ApiException as err: +            if err.status != 404: +                raise err + +        # then try other ext_id lookups +        if not existing: +            for extid_type in ('doi', 'pmid', 'pmcid'): +                extid_val = getattr(re.ext_ids, extid_type) +                if not extid_val: +                    continue +                #print(f"  lookup release type: {extid_type} val: {extid_val}") +                try: +                    existing = self.api.lookup_release(**{extid_type: extid_val}) +                except fatcat_openapi_client.rest.ApiException as err: +                    if err.status != 404: +                        raise err +                if existing: +                    if existing.ext_ids.doaj: +                        warn_str = f"unexpected DOAJ ext_id match after lookup failed doaj={re.ext_ids.doaj} ident={existing.ident}" +                        warnings.warn(warn_str) +                        self.counts["skip-doaj-id-mismatch"] += 1 +                        return False +                    break + +        # TODO: in the future could do fuzzy match here, eg using elasticsearch + +        # create entity +        if not existing: +            return True + +        # other logic could go here about skipping updates +        if not self.do_updates or existing.ext_ids.doaj: +            self.counts['exists'] += 1 +            return False + +        # fields to copy over for update +        existing.ext_ids.doaj = existing.ext_ids.doaj or re.ext_ids.doaj +        existing.release_type = existing.release_type or re.release_type +        existing.release_stage = existing.release_stage or re.release_stage +        existing.container_id = existing.container_id or re.container_id +        existing.abstracts = existing.abstracts or re.abstracts +        existing.extra['doaj'] = re.extra['doaj'] +        existing.volume = existing.volume or re.volume +        existing.issue = existing.issue or re.issue +        existing.pages = existing.pages or re.pages +        existing.language = existing.language or re.language + +        try: +            self.api.update_release(self.get_editgroup_id(), existing.ident, existing) +            self.counts['update'] += 1 +        except fatcat_openapi_client.rest.ApiException as err: +            # there is a code path where we try to update the same release +            # twice in a row; if that happens, just skip +            # NOTE: API behavior might change in the future? +            if "release_edit_editgroup_id_ident_id_key" in err.body: +                self.counts['skip-update-conflict'] += 1 +                return False +            else: +                raise err + +        return False + +    def insert_batch(self, batch): +        self.api.create_release_auto_batch(fatcat_openapi_client.ReleaseAutoBatch( +            editgroup=fatcat_openapi_client.Editgroup( +                description=self.editgroup_description, +                extra=self.editgroup_extra), +            entity_list=batch)) + +    def doaj_abstracts(self, bibjson: dict) -> List[fatcat_openapi_client.ReleaseAbstract]: +        text = clean_str(bibjson.get('abstract')) +        if not text or len(text) < 10: +            return [] +        if len(text) > MAX_ABSTRACT_LENGTH: +            text = text[:MAX_ABSTRACT_LENGTH] + " [...]" + +        lang = detect_text_lang(text) + +        abstract = fatcat_openapi_client.ReleaseAbstract( +            mimetype="text/plain", +            content=text, +            lang=lang, +        ) + +        return [abstract,] + +    def doaj_contribs(self, authors: List[dict]) -> List[fatcat_openapi_client.ReleaseContrib]: +        """ +        bibjson.author { +            affiliation (string, optional), +            name (string), +            orcid_id (string, optional) +        } +        """ +        contribs = [] +        index = 0 +        for author in authors: +            if not author.get('name'): +                continue +            creator_id = None +            orcid = clean_orcid(author.get('orcid_id')) +            if orcid: +                creator_id = self.lookup_orcid(orcid) +            contribs.append(fatcat_openapi_client.ReleaseContrib( +                raw_name=author.get('name'), +                role='author', +                index=index, +                creator_id=creator_id, +                raw_affiliation=clean_str(author.get('affiliation')), +            )) +            index += 1 +        return contribs + +    def doaj_ext_ids(self, identifiers: List[dict], doaj_article_id: str) -> fatcat_openapi_client.ReleaseExtIds: +        """ +        bibjson.identifier { +            id (string), +            type (string) +        } +        """ + +        assert doaj_article_id.isalnum() and len(doaj_article_id) == 32 + +        doi: Optional[str] = None +        pmid: Optional[str] = None +        pmcid: Optional[str] = None +        for id_obj in identifiers: +            if not id_obj.get('id'): +                continue +            if id_obj['type'].lower() == 'doi': +                doi = clean_doi(id_obj['id']) +            elif id_obj['type'].lower() == 'pmid': +                pmid = clean_pmid(id_obj['id']) +            elif id_obj['type'].lower() == 'pmcid': +                pmcid = clean_pmcid(id_obj['id']) + +        return fatcat_openapi_client.ReleaseExtIds( +            doaj=doaj_article_id, +            doi=doi, +            pmid=pmid, +            pmcid=pmcid, +        ) + +    def doaj_license_slug(self, license_list: List[dict]) -> Optional[str]: +        """ +        bibjson.journal.license { +            open_access (boolean, optional), +            title (string, optional), +            type (string, optional), +            url (string, optional), +            version (string, optional) +        } +        """ +        if not license_list: +            return None +        for license in license_list: +            if not license.get('open_access'): +                continue +            slug = license.get('type') +            if slug.startswith('CC '): +                slug = slug.replace('CC ', 'cc-').lower() +                return slug +        return None diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index 4b1d3702..1e04e712 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -1,4 +1,6 @@ +import datetime +  import fatcat_openapi_client  from .common import EntityImporter, make_rel_url @@ -20,10 +22,10 @@ class IngestFileResultImporter(EntityImporter):          assert self.default_link_rel          self.require_grobid = require_grobid          if self.require_grobid: -            print("Requiring GROBID status == 200") +            print("Requiring GROBID status == 200 (for PDFs)")          else:              print("NOT checking GROBID success") -        self.ingest_request_source_whitelist = [ +        self.ingest_request_source_allowlist = [              'fatcat-changelog',              'fatcat-ingest-container',              'fatcat-ingest', @@ -35,23 +37,41 @@ class IngestFileResultImporter(EntityImporter):              's2-corpus',              's2',          ] -        if kwargs.get('skip_source_whitelist', False): -            self.ingest_request_source_whitelist = [] +        if kwargs.get('skip_source_allowlist', False): +            self.ingest_request_source_allowlist = [] -    def want(self, row): +    def want_file(self, row) -> bool: +        """ +        File-specific part of want(). Generic across general ingest and save-paper-now.          """ -        Logic here probably needs work (TODO): -        - Direct ingests via DOI from fatcat-changelog should probably go -          through regardless of GROBID status -        - We should filter/block things like single-page PDFs here -        - public/anonymous submissions could require successful biblio-glutton -          match, or some other sanity check on the fatcat side (eg, fuzzy title -          match) -        - handle the case of release_stage not being 'published'; if pre-print, -          potentially create a new release. +        if not row.get('file_meta'): +            self.counts['skip-file-meta'] += 1 +            return False -        The current logic is intentionally conservative as a first step. +        # type-specific filters +        if row['request'].get('ingest_type') == 'pdf': +            if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: +                self.counts['skip-grobid'] += 1 +                return False +            if row['file_meta'].get('mimetype') not in ("application/pdf",): +                self.counts['skip-mimetype'] += 1 +                return False +        elif row['request'].get('ingest_type') == 'xml': +            if row['file_meta'].get('mimetype') not in ("application/xml", +                    "application/jats+xml", "application/tei+xml", "text/xml"): +                self.counts['skip-mimetype'] += 1 +                return False +        else: +            self.counts['skip-ingest-type'] += 1 +            return False + +        return True + +    def want_ingest(self, row) -> bool: +        """ +        Sandcrawler ingest-specific part of want(). Generic across file and +        webcapture ingest.          """          if row.get('hit') != True:              self.counts['skip-hit'] += 1 @@ -60,33 +80,48 @@ class IngestFileResultImporter(EntityImporter):          if not source:              self.counts['skip-ingest_request_source'] += 1              return False -        if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist: +        if self.ingest_request_source_allowlist and source not in self.ingest_request_source_allowlist:              self.counts['skip-ingest_request_source'] += 1              return False -        if source.startswith('arabesque'): -            if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'): -                self.counts['skip-arabesque-source'] += 1 -                return False + +        if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'): +            self.counts['skip-link-source'] += 1 +            return False +          if source.startswith('savepapernow'):              # never process async savepapernow requests              self.counts['skip-savepapernow'] += 1              return False -        if not row.get('file_meta'): -            self.counts['skip-file-meta'] += 1 + +        return True + +    def want(self, row): +        """ +        Overall logic here probably needs work (TODO): + +        - Direct ingests via DOI from fatcat-changelog should probably go +          through regardless of GROBID status +        - We should filter/block things like single-page PDFs here +        - public/anonymous submissions could require successful biblio-glutton +          match, or some other sanity check on the fatcat side (eg, fuzzy title +          match) +        - handle the case of release_stage not being 'published'; if pre-print, +          potentially create a new release. + +        The current logic is intentionally conservative as a first step. +        """ +        if not self.want_file(row):              return False -        if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: -            self.counts['skip-grobid'] += 1 +        if not self.want_ingest(row):              return False          return True -    def parse_record(self, row): +    def parse_ingest_release_ident(self, row):          request = row['request']          fatcat = request.get('fatcat') -        file_meta = row['file_meta'] -        # identify release by fatcat ident, or extid lookup, or biblio-glutton match          release_ident = None          if fatcat and fatcat.get('release_ident'):              release_ident = fatcat.get('release_ident') @@ -112,23 +147,21 @@ class IngestFileResultImporter(EntityImporter):                          return None                  release_ident = release.ident                  break +          if self.use_glutton_match and not release_ident and row.get('grobid'):              # try biblio-glutton extracted hit              if row['grobid'].get('fatcat_release'):                  release_ident = row['grobid']['fatcat_release'].split('_')[-1]                  self.counts['glutton-match'] += 1 -        if not release_ident: -            self.counts['skip-release-not-found'] += 1 -            return None +        return release_ident +    def parse_terminal(self, row):          terminal = row.get('terminal')          if not terminal:              # support old cdx-only ingest results              cdx = row.get('cdx')              if not cdx: -                # TODO: support archive.org hits? -                self.counts['skip-no-terminal'] += 1                  return None              else:                  terminal = { @@ -142,7 +175,15 @@ class IngestFileResultImporter(EntityImporter):              terminal['terminal_url'] = terminal['url']          if not 'terminal_dt' in terminal:              terminal['terminal_dt'] = terminal['dt'] + +        # convert CDX-style digits to ISO-style timestamp          assert len(terminal['terminal_dt']) == 14 +        terminal['terminal_timestamp'] = datetime.datetime.strptime(terminal['terminal_dt'], "%Y%m%d%H%M%S").isoformat() + "Z" +        return terminal + +    def parse_urls(self, row, terminal): + +        request = row['request']          default_rel = self.default_link_rel          if request.get('link_source') == 'doi': @@ -159,6 +200,55 @@ class IngestFileResultImporter(EntityImporter):          urls = [url, ("webarchive", wayback)]          urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls] +        return urls + +    def parse_edit_extra(self, row): + +        request = row['request'] +        edit_extra = dict() + +        if request.get('edit_extra'): +            edit_extra = request['edit_extra'] + +        if request.get('ingest_request_source'): +            edit_extra['ingest_request_source'] = request['ingest_request_source'] +        if request.get('link_source') and request.get('link_source_id'): +            edit_extra['link_source'] = request['link_source'] +            edit_extra['link_source_id'] = request['link_source_id'] + +        return edit_extra + +    def parse_record(self, row): + +        request = row['request'] +        file_meta = row['file_meta'] + +        # double check that want() filtered request correctly (eg, old requests) +        if request.get('ingest_type') not in ('pdf', 'xml'): +            self.counts['skip-ingest-type'] += 1 +            return None +        assert (request['ingest_type'], file_meta['mimetype']) in [ +            ("pdf", "application/pdf"), +            ("xml", "application/xml"), +            ("xml", "application/jats+xml"), +            ("xml", "application/tei+xml"), +            ("xml", "text/xml"), +        ] + +        # identify release by fatcat ident, or extid lookup, or biblio-glutton match +        release_ident = self.parse_ingest_release_ident(row) + +        if not release_ident: +            self.counts['skip-release-not-found'] += 1 +            return None + +        terminal = self.parse_terminal(row) +        if not terminal: +            # TODO: support archive.org hits? +            self.counts['skip-no-terminal'] += 1 +            return None + +        urls = self.parse_urls(row, terminal)          fe = fatcat_openapi_client.FileEntity(              md5=file_meta['md5hex'], @@ -169,17 +259,10 @@ class IngestFileResultImporter(EntityImporter):              release_ids=[release_ident],              urls=urls,          ) -        if request.get('edit_extra'): -            fe.edit_extra = request['edit_extra'] -        else: -            fe.edit_extra = dict() -        if request.get('ingest_request_source'): -            fe.edit_extra['ingest_request_source'] = request['ingest_request_source'] -        if request.get('link_source') and request.get('link_source_id'): -            fe.edit_extra['link_source'] = request['link_source'] -            fe.edit_extra['link_source_id'] = request['link_source_id'] -        if not fe.edit_extra: -            fe.edit_extra = None + +        edit_extra = self.parse_edit_extra(row) +        if edit_extra: +            fe.edit_extra = edit_extra          return fe      def try_update(self, fe): @@ -244,6 +327,9 @@ class SavePaperNowFileImporter(IngestFileResultImporter):      def want(self, row): +        if not self.want_file(row): +            return False +          source = row['request'].get('ingest_request_source')          if not source:              self.counts['skip-ingest_request_source'] += 1 @@ -254,12 +340,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter):          if row.get('hit') != True:              self.counts['skip-hit'] += 1              return False -        if not row.get('file_meta'): -            self.counts['skip-file-meta'] += 1 -            return False -        if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: -            self.counts['skip-grobid'] += 1 -            return False          return True @@ -280,3 +360,154 @@ class SavePaperNowFileImporter(IngestFileResultImporter):                      description=self.editgroup_description,                      extra=self.editgroup_extra),                  entity_list=batch)) + + +class IngestWebResultImporter(IngestFileResultImporter): +    """ +    Variant of IngestFileResultImporter for processing HTML ingest requests +    into webcapture objects. +    """ + +    def __init__(self, api, **kwargs): + +        eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled from web using sandcrawler ingest tool" +        eg_extra = kwargs.pop('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebResultImporter') +        kwargs['do_updates'] = False +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) + +    def want(self, row): + +        if not self.want_ingest(row): +            return False + +        if not row.get('file_meta'): +            self.counts['skip-file-meta'] += 1 +            return False + +        # webcapture-specific filters +        if row['request'].get('ingest_type') != 'html': +            self.counts['skip-ingest-type'] += 1 +            return False +        if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"): +            self.counts['skip-mimetype'] += 1 +            return False + +        return True + +    def parse_record(self, row): + +        request = row['request'] +        file_meta = row['file_meta'] + +        # double check that want() filtered request correctly (eg, old requests) +        if request.get('ingest_type') != "html": +            self.counts['skip-ingest-type'] += 1 +            return None +        if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): +            self.counts['skip-mimetype'] += 1 +            return None + +        # identify release by fatcat ident, or extid lookup +        release_ident = self.parse_ingest_release_ident(row) + +        if not release_ident: +            self.counts['skip-release-not-found'] += 1 +            return None + +        terminal = self.parse_terminal(row) +        if not terminal: +            # TODO: support archive.org hits? +            self.counts['skip-no-terminal'] += 1 +            return None + +        urls = self.parse_urls(row, terminal) +        archive_urls = [u for u in urls if u.rel == 'webarchive'] + +        if terminal['terminal_status_code'] != 200: +            self.counts['skip-terminal-status-code'] += 1 +            return None + +        terminal_cdx = row['cdx'] +        if 'revisit_cdx' in row: +            terminal_cdx = row['revisit_cdx'] +        assert terminal_cdx['surt'] +        assert terminal_cdx['url'] == terminal['terminal_url'] + +        wc_cdx = [] +        # primary resource first +        wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine( +            surt=terminal_cdx['surt'], +            timestamp=terminal['terminal_timestamp'], +            url=terminal['terminal_url'], +            mimetype=file_meta['mimetype'], +            status_code=terminal['terminal_status_code'], +            sha1=file_meta['sha1hex'], +            sha256=file_meta['sha256hex'], +            size=file_meta['size_bytes'], +        )) + +        for resource in row.get('html_resources', []): +            timestamp = resource['timestamp'] +            if not "+" in timestamp and not "Z" in timestamp: +                timestamp += "Z" +            wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine( +                surt=resource['surt'], +                timestamp=timestamp, +                url=resource['url'], +                mimetype=resource.get('mimetype'), +                size=resource.get('size'), +                sha1=resource.get('sha1hex'), +                sha256=resource.get('sha256hex'), +            )) + +        wc = fatcat_openapi_client.WebcaptureEntity( +            cdx=wc_cdx, +            archive_urls=archive_urls, +            original_url=terminal['terminal_url'], +            timestamp=terminal['terminal_timestamp'], +            release_ids=[release_ident], +        ) + +        edit_extra = self.parse_edit_extra(row) + +        if edit_extra: +            wc.edit_extra = edit_extra +        return wc + +    def try_update(self, wc): + +        # check for existing edits-in-progress with same URL +        for other in self._entity_queue: +            if other.original_url == wc.original_url: +                self.counts['skip-in-queue'] += 1 +                return False + +        # lookup sha1, or create new entity (TODO: API doesn't support this yet) +        #existing = None + +        # TODO: currently only allow one release per webcapture +        release = self.api.get_release(wc.release_ids[0], expand="webcaptures") +        if release.webcaptures: +            # check if this is an existing match, or just a similar hit +            for other in release.webcaptures: +                if wc.original_url == other.original_url: +                    # TODO: compare very similar timestamps of same time (different formats) +                    self.counts['exists'] += 1 +                    return False +            self.counts['skip-release-has-webcapture'] += 1 +            return False + +        # Ok, if we got here then no existing web capture for (first) release, +        # so go ahead and insert! +        return True + +    def insert_batch(self, batch): +        self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch( +            editgroup=fatcat_openapi_client.Editgroup( +                description=self.editgroup_description, +                extra=self.editgroup_extra), +            entity_list=batch))  | 
