diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
commit | 31d1a6a713d177990609767d508209ced19ca396 (patch) | |
tree | a628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/transforms | |
parent | 9dc891b8098542bb089c8c47098b60a8beb76a53 (diff) | |
download | fatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz fatcat-31d1a6a713d177990609767d508209ced19ca396.zip |
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/transforms')
-rw-r--r-- | python/fatcat_tools/transforms/access.py | 44 | ||||
-rw-r--r-- | python/fatcat_tools/transforms/csl.py | 185 | ||||
-rw-r--r-- | python/fatcat_tools/transforms/elasticsearch.py | 668 | ||||
-rw-r--r-- | python/fatcat_tools/transforms/ingest.py | 64 |
4 files changed, 517 insertions, 444 deletions
diff --git a/python/fatcat_tools/transforms/access.py b/python/fatcat_tools/transforms/access.py index ae9880e7..34212a6a 100644 --- a/python/fatcat_tools/transforms/access.py +++ b/python/fatcat_tools/transforms/access.py @@ -1,4 +1,3 @@ - from enum import Enum from typing import List, Optional @@ -16,6 +15,7 @@ class AccessType(str, Enum): openlibrary = "openlibrary" wikipedia = "wikipedia" + class AccessOption(BaseModel): access_type: AccessType @@ -40,27 +40,31 @@ def release_access_options(release: ReleaseEntity) -> List[AccessOption]: option found """ options = [] - for f in (release.files or []): + for f in release.files or []: thumbnail_url = None - if f.mimetype == 'application/pdf' and f.sha1 and f.urls: + if f.mimetype == "application/pdf" and f.sha1 and f.urls: # NOTE: scholar.archive.org does an actual database check before # generating these URLs, but we skip that for speed thumbnail_url = f"https://blobs.fatcat.wiki/thumbnail/pdf/{f.sha1[0:2]}/{f.sha1[2:4]}/{f.sha1}.180px.jpg" - for u in (f.urls or []): - if '://web.archive.org/' in u.url: - return [AccessOption( - access_type="wayback", - access_url=u.url, - mimetype=f.mimetype, - size_bytes=f.size, - thumbnail_url=thumbnail_url, - )] - elif '://archive.org/' in u.url: - return [AccessOption( - access_type="ia_file", - access_url=u.url, - mimetype=f.mimetype, - size_bytes=f.size, - thumbnail_url=thumbnail_url, - )] + for u in f.urls or []: + if "://web.archive.org/" in u.url: + return [ + AccessOption( + access_type="wayback", + access_url=u.url, + mimetype=f.mimetype, + size_bytes=f.size, + thumbnail_url=thumbnail_url, + ) + ] + elif "://archive.org/" in u.url: + return [ + AccessOption( + access_type="ia_file", + access_url=u.url, + mimetype=f.mimetype, + size_bytes=f.size, + thumbnail_url=thumbnail_url, + ) + ] return options diff --git a/python/fatcat_tools/transforms/csl.py b/python/fatcat_tools/transforms/csl.py index f8b26bce..2b39068a 100644 --- a/python/fatcat_tools/transforms/csl.py +++ b/python/fatcat_tools/transforms/csl.py @@ -1,4 +1,3 @@ - import json from citeproc import ( @@ -13,10 +12,10 @@ from citeproc_styles import get_style_filepath def contribs_by_role(contribs, role): - ret = [c.copy() for c in contribs if c['role'] == role] - [c.pop('role') for c in ret] + ret = [c.copy() for c in contribs if c["role"] == role] + [c.pop("role") for c in ret] # TODO: some note to self here - [c.pop('literal') for c in ret if 'literal' in c] + [c.pop("literal") for c in ret if "literal" in c] if not ret: return None else: @@ -33,26 +32,30 @@ def release_to_csl(entity): Follows, but not enforced by: https://github.com/citation-style-language/schema/blob/master/csl-data.json """ contribs = [] - for contrib in (entity.contribs or []): + for contrib in entity.contribs or []: if contrib.creator: # Default to "local" (publication-specific) metadata; fall back to # creator-level - family = contrib.creator.surname or contrib.surname or (contrib.raw_name and contrib.raw_name.split()[-1]) + family = ( + contrib.creator.surname + or contrib.surname + or (contrib.raw_name and contrib.raw_name.split()[-1]) + ) if not family: # CSL requires some surname (family name) continue c = dict( family=family, given=contrib.creator.given_name or contrib.given_name, - #dropping-particle - #non-dropping-particle - #suffix - #comma-suffix - #static-ordering + # dropping-particle + # non-dropping-particle + # suffix + # comma-suffix + # static-ordering literal=contrib.creator.display_name or contrib.raw_name, - #parse-names, + # parse-names, # role must be defined; default to author - role=contrib.role or 'author', + role=contrib.role or "author", ) else: family = contrib.surname or (contrib.raw_name and contrib.raw_name.split()[-1]) @@ -64,7 +67,7 @@ def release_to_csl(entity): given=contrib.given_name, literal=contrib.raw_name, # role must be defined; default to author - role=contrib.role or 'author', + role=contrib.role or "author", ) for k in list(c.keys()): if not c[k]: @@ -78,93 +81,108 @@ def release_to_csl(entity): issued_date = None if entity.release_date: - issued_date = {"date-parts": [[ - entity.release_date.year, - entity.release_date.month, - entity.release_date.day, - ]]} + issued_date = { + "date-parts": [ + [ + entity.release_date.year, + entity.release_date.month, + entity.release_date.day, + ] + ] + } elif entity.release_year: issued_date = {"date-parts": [[entity.release_year]]} csl = dict( - #id, - #categories - type=entity.release_type or "article", # can't be blank + # id, + # categories + type=entity.release_type or "article", # can't be blank language=entity.language, - #journalAbbreviation - #shortTitle + # journalAbbreviation + # shortTitle ## see below for all contrib roles - #accessed - #container - #event-date + # accessed + # container + # event-date issued=issued_date, - #original-date - #submitted + # original-date + # submitted abstract=abstract, - #annote - #archive - #archive_location - #archive-place - #authority - #call-number - #chapter-number - #citation-number - #citation-label - #collection-number - #collection-title + # annote + # archive + # archive_location + # archive-place + # authority + # call-number + # chapter-number + # citation-number + # citation-label + # collection-number + # collection-title container_title=entity.container and entity.container.name, - #container-title-short - #dimensions + # container-title-short + # dimensions DOI=entity.ext_ids.doi, - #edition - #event - #event-place - #first-reference-note-number - #genre + # edition + # event + # event-place + # first-reference-note-number + # genre ISBN=entity.ext_ids.isbn13, ISSN=entity.container and entity.container.issnl, issue=entity.issue, - #jurisdiction - #keyword - #locator - #medium - #note - #number - #number-of-pages - #number-of-volumes - #original-publisher - #original-publisher-place - #original-title + # jurisdiction + # keyword + # locator + # medium + # note + # number + # number-of-pages + # number-of-volumes + # original-publisher + # original-publisher-place + # original-title # TODO: page=entity.pages, - page_first=entity.pages and entity.pages.split('-')[0], + page_first=entity.pages and entity.pages.split("-")[0], PMCID=entity.ext_ids.pmcid, PMID=entity.ext_ids.pmid, publisher=(entity.container and entity.container.publisher) or entity.publisher, - #publisher-place - #references - #reviewed-title - #scale - #section - #source - #status + # publisher-place + # references + # reviewed-title + # scale + # section + # source + # status title=entity.title, - #title-short - #URL - #version + # title-short + # URL + # version volume=entity.volume, - #year-suffix + # year-suffix ) - for role in ['author', 'collection-editor', 'composer', 'container-author', - 'director', 'editor', 'editorial-director', 'interviewer', - 'illustrator', 'original-author', 'recipient', 'reviewed-author', - 'translator']: + for role in [ + "author", + "collection-editor", + "composer", + "container-author", + "director", + "editor", + "editorial-director", + "interviewer", + "illustrator", + "original-author", + "recipient", + "reviewed-author", + "translator", + ]: cbr = contribs_by_role(contribs, role) if cbr: csl[role] = cbr # underline-to-dash - csl['container-title'] = csl.pop('container_title') - csl['page-first'] = csl.pop('page_first') - empty_keys = [k for k,v in csl.items() if not v] + csl["container-title"] = csl.pop("container_title") + csl["page-first"] = csl.pop("page_first") + empty_keys = [k for k, v in csl.items() if not v] for k in empty_keys: csl.pop(k) return csl @@ -184,10 +202,11 @@ def refs_to_csl(entity): title=ref.title, issued=issued_date, ) - csl['id'] = ref.key or ref.index, # zero- or one-indexed? + csl["id"] = (ref.key or ref.index,) # zero- or one-indexed? ret.append(csl) return ret + def citeproc_csl(csl_json, style, html=False): """ Renders a release entity to a styled citation. @@ -200,8 +219,8 @@ def citeproc_csl(csl_json, style, html=False): Returns a string; if the html flag is set, and the style isn't 'csl-json' or 'bibtex', it will be HTML. Otherwise plain text. """ - if not csl_json.get('id'): - csl_json['id'] = "unknown" + if not csl_json.get("id"): + csl_json["id"] = "unknown" if style == "csl-json": return json.dumps(csl_json) bib_src = CiteProcJSON([csl_json]) @@ -211,7 +230,7 @@ def citeproc_csl(csl_json, style, html=False): style_path = get_style_filepath(style) bib_style = CitationStylesStyle(style_path, validate=False) bib = CitationStylesBibliography(bib_style, bib_src, form) - bib.register(Citation([CitationItem(csl_json['id'])])) + bib.register(Citation([CitationItem(csl_json["id"])])) lines = bib.bibliography()[0] if style == "bibtex": out = "" @@ -222,6 +241,6 @@ def citeproc_csl(csl_json, style, html=False): out += "\n " + line else: out += line - return ''.join(out) + return "".join(out) else: - return ''.join(lines) + return "".join(lines) diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index 1826d4eb..e39e9ea4 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -1,4 +1,3 @@ - import datetime from typing import Any, Dict, Optional @@ -13,13 +12,14 @@ from fatcat_openapi_client import ( def check_kbart(year: int, archive: dict) -> Optional[bool]: - if not archive or not archive.get('year_spans'): + if not archive or not archive.get("year_spans"): return None - for span in archive['year_spans']: + for span in archive["year_spans"]: if year >= span[0] and year <= span[1]: return True return False + def test_check_kbart() -> None: assert check_kbart(1990, dict()) is None @@ -40,87 +40,89 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> Raises exception on error (never returns None) """ - if entity.state in ('redirect', 'deleted'): + if entity.state in ("redirect", "deleted"): return dict( - ident = entity.ident, - state = entity.state, + ident=entity.ident, + state=entity.state, ) - elif entity.state != 'active': + elif entity.state != "active": raise ValueError("Unhandled entity state: {}".format(entity.state)) # First, the easy ones (direct copy) release = entity t: Dict[str, Any] = dict( - doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", - ident = release.ident, - state = release.state, - revision = release.revision, - work_id = release.work_id, - title = release.title, - subtitle = release.subtitle, - original_title = release.original_title, - release_type = release.release_type, - release_stage = release.release_stage, - withdrawn_status = release.withdrawn_status, - language = release.language, - volume = release.volume, - issue = release.issue, - pages = release.pages, - number = release.number, - license = release.license_slug, - version = release.version, - doi = release.ext_ids.doi, - pmid = release.ext_ids.pmid, - pmcid = release.ext_ids.pmcid, - isbn13 = release.ext_ids.isbn13, - wikidata_qid = release.ext_ids.wikidata_qid, - core_id = release.ext_ids.core, - arxiv_id = release.ext_ids.arxiv, - jstor_id = release.ext_ids.jstor, - ark_id = release.ext_ids.ark, - mag_id = release.ext_ids.mag, - dblp_id = release.ext_ids.dblp, - doaj_id = release.ext_ids.doaj, - hdl = release.ext_ids.hdl, - tags = [], + doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z", + ident=release.ident, + state=release.state, + revision=release.revision, + work_id=release.work_id, + title=release.title, + subtitle=release.subtitle, + original_title=release.original_title, + release_type=release.release_type, + release_stage=release.release_stage, + withdrawn_status=release.withdrawn_status, + language=release.language, + volume=release.volume, + issue=release.issue, + pages=release.pages, + number=release.number, + license=release.license_slug, + version=release.version, + doi=release.ext_ids.doi, + pmid=release.ext_ids.pmid, + pmcid=release.ext_ids.pmcid, + isbn13=release.ext_ids.isbn13, + wikidata_qid=release.ext_ids.wikidata_qid, + core_id=release.ext_ids.core, + arxiv_id=release.ext_ids.arxiv, + jstor_id=release.ext_ids.jstor, + ark_id=release.ext_ids.ark, + mag_id=release.ext_ids.mag, + dblp_id=release.ext_ids.dblp, + doaj_id=release.ext_ids.doaj, + hdl=release.ext_ids.hdl, + tags=[], ) - t.update(dict( - is_oa = None, - is_longtail_oa = None, - is_preserved = None, - in_web = False, - in_dweb = False, - in_ia = False, - in_ia_sim = False, - in_kbart = None, - in_jstor = False, - in_doaj= bool(release.ext_ids.doaj), - in_shadows = False, - )) + t.update( + dict( + is_oa=None, + is_longtail_oa=None, + is_preserved=None, + in_web=False, + in_dweb=False, + in_ia=False, + in_ia_sim=False, + in_kbart=None, + in_jstor=False, + in_doaj=bool(release.ext_ids.doaj), + in_shadows=False, + ) + ) release_year = release.release_year if release.release_date: # .isoformat() results in, eg, '2010-10-22' (YYYY-MM-DD) - t['release_date'] = release.release_date.isoformat() + t["release_date"] = release.release_date.isoformat() if not release_year: release_year = release.release_date.year if release_year: - t['release_year'] = release_year + t["release_year"] = release_year - t['any_abstract'] = len(release.abstracts or []) > 0 - t['ref_count'] = len(release.refs or []) + t["any_abstract"] = len(release.abstracts or []) > 0 + t["ref_count"] = len(release.refs or []) ref_release_ids = [] - for r in (release.refs or []): + for r in release.refs or []: if r.target_release_id: ref_release_ids.append(r.target_release_id) - t['ref_release_ids'] = ref_release_ids - t['ref_linked_count'] = len(ref_release_ids) - t['contrib_count'] = len(release.contribs or []) + t["ref_release_ids"] = ref_release_ids + t["ref_linked_count"] = len(ref_release_ids) + t["contrib_count"] = len(release.contribs or []) contrib_names = [] contrib_affiliations = [] creator_ids = [] - for c in (release.contribs or []): + for c in release.contribs or []: if c.creator and c.creator.display_name: contrib_names.append(c.creator.display_name) elif c.raw_name: @@ -132,193 +134,218 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> creator_ids.append(c.creator_id) if c.raw_affiliation: contrib_affiliations.append(c.raw_affiliation) - t['contrib_names'] = contrib_names - t['creator_ids'] = creator_ids - t['affiliations'] = contrib_affiliations + t["contrib_names"] = contrib_names + t["creator_ids"] = creator_ids + t["affiliations"] = contrib_affiliations # TODO: mapping... probably by lookup? - t['affiliation_rors'] = None + t["affiliation_rors"] = None if release.container: t.update(_rte_container_helper(release.container, release_year)) # fall back to release-level container metadata if container not linked or # missing context - if not t.get('publisher'): - t['publisher'] = release.publisher - if not t.get('container_name') and release.extra: - t['container_name'] = release.extra.get('container_name') + if not t.get("publisher"): + t["publisher"] = release.publisher + if not t.get("container_name") and release.extra: + t["container_name"] = release.extra.get("container_name") - if release.ext_ids.jstor or (release.ext_ids.doi and release.ext_ids.doi.startswith('10.2307/')): - t['in_jstor'] = True + if release.ext_ids.jstor or ( + release.ext_ids.doi and release.ext_ids.doi.startswith("10.2307/") + ): + t["in_jstor"] = True # transform file/fileset/webcapture related fields t.update(_rte_content_helper(release)) if release.ext_ids.doaj: - t['is_oa'] = True + t["is_oa"] = True if release.license_slug: # TODO: more/better checks here, particularly strict *not* OA licenses if release.license_slug.startswith("CC-"): - t['is_oa'] = True + t["is_oa"] = True if release.license_slug.startswith("ARXIV-"): - t['is_oa'] = True + t["is_oa"] = True - t['is_work_alias'] = None + t["is_work_alias"] = None extra = release.extra or dict() if extra: - if extra.get('is_oa'): + if extra.get("is_oa"): # NOTE: not actually setting this anywhere... but could - t['is_oa'] = True - if extra.get('is_work_alias') is not None: - t['is_work_alias'] = bool(extra.get('is_work_alias')) - if extra.get('longtail_oa'): + t["is_oa"] = True + if extra.get("is_work_alias") is not None: + t["is_work_alias"] = bool(extra.get("is_work_alias")) + if extra.get("longtail_oa"): # sometimes set by GROBID/matcher - t['is_oa'] = True - t['is_longtail_oa'] = True - if not t.get('container_name'): - t['container_name'] = extra.get('container_name') - if extra.get('crossref'): - if extra['crossref'].get('archive'): + t["is_oa"] = True + t["is_longtail_oa"] = True + if not t.get("container_name"): + t["container_name"] = extra.get("container_name") + if extra.get("crossref"): + if extra["crossref"].get("archive"): # all crossref archives are KBART, I believe - t['in_kbart'] = True + t["in_kbart"] = True # backwards compatible subtitle fetching - if not t['subtitle'] and extra.get('subtitle'): - if type(extra['subtitle']) == list: - t['subtitle'] = extra['subtitle'][0] + if not t["subtitle"] and extra.get("subtitle"): + if type(extra["subtitle"]) == list: + t["subtitle"] = extra["subtitle"][0] else: - t['subtitle'] = extra['subtitle'] + t["subtitle"] = extra["subtitle"] - t['first_page'] = None + t["first_page"] = None if release.pages: - first = release.pages.split('-')[0] - first = first.replace('p', '') + first = release.pages.split("-")[0] + first = first.replace("p", "") if first.isdigit(): - t['first_page'] = first + t["first_page"] = first # TODO: non-numerical first pages - t['ia_microfilm_url'] = None - if t['in_ia_sim']: + t["ia_microfilm_url"] = None + if t["in_ia_sim"]: # TODO: determine URL somehow? I think this is in flux. Will probably # need extra metadata in the container extra field. # special case as a demo for now. - if release.container_id == "hl5g6d5msjcl7hlbyyvcsbhc2u" \ - and release.release_year in (2011, 2013) \ - and release.issue \ - and release.issue.isdigit() \ - and t['first_page']: - t['ia_microfilm_url'] = "https://archive.org/details/sim_bjog_{}-{:02d}/page/n{}".format( + if ( + release.container_id == "hl5g6d5msjcl7hlbyyvcsbhc2u" + and release.release_year in (2011, 2013) + and release.issue + and release.issue.isdigit() + and t["first_page"] + ): + t[ + "ia_microfilm_url" + ] = "https://archive.org/details/sim_bjog_{}-{:02d}/page/n{}".format( release.release_year, int(release.issue) - 1, - t['first_page'], + t["first_page"], ) - t['doi_registrar'] = None - if extra and t['doi']: - for k in ('crossref', 'datacite', 'jalc'): + t["doi_registrar"] = None + if extra and t["doi"]: + for k in ("crossref", "datacite", "jalc"): if k in extra: - t['doi_registrar'] = k - if 'doi_registrar' not in t: - t['doi_registrar'] = 'crossref' + t["doi_registrar"] = k + if "doi_registrar" not in t: + t["doi_registrar"] = "crossref" - if t['doi']: - t['doi_prefix'] = t['doi'].split('/')[0] + if t["doi"]: + t["doi_prefix"] = t["doi"].split("/")[0] - if t['is_longtail_oa']: - t['is_oa'] = True + if t["is_longtail_oa"]: + t["is_oa"] = True # optionally coerce all flags from Optional[bool] to bool if force_bool: - for k in ('is_oa', 'is_longtail_oa', 'in_kbart', 'in_ia_sim', - 'in_jstor', 'in_web', 'in_dweb', 'in_shadows', - 'is_work_alias'): + for k in ( + "is_oa", + "is_longtail_oa", + "in_kbart", + "in_ia_sim", + "in_jstor", + "in_web", + "in_dweb", + "in_shadows", + "is_work_alias", + ): t[k] = bool(t[k]) - t['in_ia'] = bool(t['in_ia']) - t['is_preserved'] = bool( - t['is_preserved'] - or t['in_ia'] - or t['in_kbart'] - or t['in_jstor'] - or t.get('pmcid') - or t.get('arxiv_id') + t["in_ia"] = bool(t["in_ia"]) + t["is_preserved"] = bool( + t["is_preserved"] + or t["in_ia"] + or t["in_kbart"] + or t["in_jstor"] + or t.get("pmcid") + or t.get("arxiv_id") ) - if t['in_ia']: - t['preservation'] = 'bright' - elif t['is_preserved']: - t['preservation'] = 'dark' - elif t['in_shadows']: - t['preservation'] = 'shadows_only' + if t["in_ia"]: + t["preservation"] = "bright" + elif t["is_preserved"]: + t["preservation"] = "dark" + elif t["in_shadows"]: + t["preservation"] = "shadows_only" else: - t['preservation'] = 'none' + t["preservation"] = "none" return t + def _rte_container_helper(container: ContainerEntity, release_year: Optional[int]) -> dict: """ Container metadata sub-section of release_to_elasticsearch() """ this_year = datetime.date.today().year t = dict() - t['publisher'] = container.publisher - t['container_name'] = container.name + t["publisher"] = container.publisher + t["container_name"] = container.name # this is container.ident, not release.container_id, because there may # be a redirect involved - t['container_id'] = container.ident - t['container_issnl'] = container.issnl + t["container_id"] = container.ident + t["container_issnl"] = container.issnl issns = [container.issnl, container.issne, container.issnp] issns = list(set([i for i in issns if i])) - t['container_issns'] = issns - t['container_type'] = container.container_type - t['container_publication_status'] = container.publication_status + t["container_issns"] = issns + t["container_type"] = container.container_type + t["container_publication_status"] = container.publication_status if container.extra: c_extra = container.extra - if c_extra.get('kbart') and release_year: - if check_kbart(release_year, c_extra['kbart'].get('jstor')): - t['in_jstor'] = True - if t.get('in_kbart') or t.get('in_jstor'): - t['in_kbart'] = True - for archive in ('portico', 'lockss', 'clockss', 'pkp_pln', - 'hathitrust', 'scholarsportal', 'cariniana'): - t['in_kbart'] = t.get('in_kbart') or check_kbart(release_year, c_extra['kbart'].get(archive)) + if c_extra.get("kbart") and release_year: + if check_kbart(release_year, c_extra["kbart"].get("jstor")): + t["in_jstor"] = True + if t.get("in_kbart") or t.get("in_jstor"): + t["in_kbart"] = True + for archive in ( + "portico", + "lockss", + "clockss", + "pkp_pln", + "hathitrust", + "scholarsportal", + "cariniana", + ): + t["in_kbart"] = t.get("in_kbart") or check_kbart( + release_year, c_extra["kbart"].get(archive) + ) # recent KBART coverage is often not updated for the # current year. So for current-year publications, consider # coverage from *last* year to also be included in the # Keeper - if not t.get('in_kbart') and release_year == this_year: - t['in_kbart'] = check_kbart(this_year - 1, c_extra['kbart'].get(archive)) - - if c_extra.get('ia'): - if c_extra['ia'].get('sim') and release_year: - t['in_ia_sim'] = check_kbart(release_year, c_extra['ia']['sim']) - if c_extra['ia'].get('longtail_oa'): - t['is_longtail_oa'] = True - if c_extra.get('sherpa_romeo'): - if c_extra['sherpa_romeo'].get('color') == 'white': - t['is_oa'] = False - if c_extra.get('default_license') and c_extra.get('default_license').startswith('CC-'): - t['is_oa'] = True - if c_extra.get('doaj'): - if c_extra['doaj'].get('as_of'): - t['is_oa'] = True - t['in_doaj'] = True - if c_extra.get('road'): - if c_extra['road'].get('as_of'): - t['is_oa'] = True - if c_extra.get('szczepanski'): - if c_extra['szczepanski'].get('as_of'): - t['is_oa'] = True - if c_extra.get('country'): - t['country_code'] = c_extra['country'] - t['country_code_upper'] = c_extra['country'].upper() - if c_extra.get('publisher_type'): - t['publisher_type'] = c_extra['publisher_type'] - if c_extra.get('discipline'): - t['discipline'] = c_extra['discipline'] + if not t.get("in_kbart") and release_year == this_year: + t["in_kbart"] = check_kbart(this_year - 1, c_extra["kbart"].get(archive)) + + if c_extra.get("ia"): + if c_extra["ia"].get("sim") and release_year: + t["in_ia_sim"] = check_kbart(release_year, c_extra["ia"]["sim"]) + if c_extra["ia"].get("longtail_oa"): + t["is_longtail_oa"] = True + if c_extra.get("sherpa_romeo"): + if c_extra["sherpa_romeo"].get("color") == "white": + t["is_oa"] = False + if c_extra.get("default_license") and c_extra.get("default_license").startswith("CC-"): + t["is_oa"] = True + if c_extra.get("doaj"): + if c_extra["doaj"].get("as_of"): + t["is_oa"] = True + t["in_doaj"] = True + if c_extra.get("road"): + if c_extra["road"].get("as_of"): + t["is_oa"] = True + if c_extra.get("szczepanski"): + if c_extra["szczepanski"].get("as_of"): + t["is_oa"] = True + if c_extra.get("country"): + t["country_code"] = c_extra["country"] + t["country_code_upper"] = c_extra["country"].upper() + if c_extra.get("publisher_type"): + t["publisher_type"] = c_extra["publisher_type"] + if c_extra.get("discipline"): + t["discipline"] = c_extra["discipline"] return t + def _rte_content_helper(release: ReleaseEntity) -> dict: """ File/FileSet/WebCapture sub-section of release_to_elasticsearch() @@ -329,9 +356,9 @@ def _rte_content_helper(release: ReleaseEntity) -> dict: - any other URL """ t = dict( - file_count = len(release.files or []), - fileset_count = len(release.filesets or []), - webcapture_count = len(release.webcaptures or []), + file_count=len(release.files or []), + fileset_count=len(release.filesets or []), + webcapture_count=len(release.webcaptures or []), ) any_pdf_url = None @@ -340,38 +367,42 @@ def _rte_content_helper(release: ReleaseEntity) -> dict: ia_pdf_url = None for f in release.files or []: - if f.extra and f.extra.get('shadows'): - t['in_shadows'] = True - is_pdf = 'pdf' in (f.mimetype or '') - for release_url in (f.urls or []): + if f.extra and f.extra.get("shadows"): + t["in_shadows"] = True + is_pdf = "pdf" in (f.mimetype or "") + for release_url in f.urls or []: # first generic flags t.update(_rte_url_helper(release_url)) # then PDF specific stuff (for generating "best URL" fields) - if not f.mimetype and 'pdf' in release_url.url.lower(): + if not f.mimetype and "pdf" in release_url.url.lower(): is_pdf = True if is_pdf: any_pdf_url = release_url.url - if release_url.rel in ('webarchive', 'repository', 'repo'): + if release_url.rel in ("webarchive", "repository", "repo"): good_pdf_url = release_url.url - if '//web.archive.org/' in release_url.url or '//archive.org/' in release_url.url: + if ( + "//web.archive.org/" in release_url.url + or "//archive.org/" in release_url.url + ): best_pdf_url = release_url.url ia_pdf_url = release_url.url # here is where we bake-in PDF url priority; IA-specific - t['best_pdf_url'] = best_pdf_url or good_pdf_url or any_pdf_url - t['ia_pdf_url'] = ia_pdf_url + t["best_pdf_url"] = best_pdf_url or good_pdf_url or any_pdf_url + t["ia_pdf_url"] = ia_pdf_url for fs in release.filesets or []: - for url_obj in (fs.urls or []): + for url_obj in fs.urls or []: t.update(_rte_url_helper(url_obj)) for wc in release.webcaptures or []: - for url_obj in (wc.archive_urls or []): + for url_obj in wc.archive_urls or []: t.update(_rte_url_helper(url_obj)) return t + def _rte_url_helper(url_obj) -> dict: """ Takes a location URL ('url' and 'rel' keys) and returns generic preservation status. @@ -382,17 +413,17 @@ def _rte_url_helper(url_obj) -> dict: these will be iteratively update() into the overal object. """ t = dict() - if url_obj.rel in ('webarchive', 'repository', 'archive', 'repo'): - t['is_preserved'] = True - if '//web.archive.org/' in url_obj.url or '//archive.org/' in url_obj.url: - t['in_ia'] = True - if url_obj.url.lower().startswith('http') or url_obj.url.lower().startswith('ftp'): - t['in_web'] = True - if url_obj.rel in ('dweb', 'p2p', 'ipfs', 'dat', 'torrent'): + if url_obj.rel in ("webarchive", "repository", "archive", "repo"): + t["is_preserved"] = True + if "//web.archive.org/" in url_obj.url or "//archive.org/" in url_obj.url: + t["in_ia"] = True + if url_obj.url.lower().startswith("http") or url_obj.url.lower().startswith("ftp"): + t["in_web"] = True + if url_obj.rel in ("dweb", "p2p", "ipfs", "dat", "torrent"): # not sure what rel will be for this stuff - t['in_dweb'] = True - if '//www.jstor.org/' in url_obj.url: - t['in_jstor'] = True + t["in_dweb"] = True + if "//www.jstor.org/" in url_obj.url: + t["in_jstor"] = True return t @@ -404,50 +435,59 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None): Raises exception on error (never returns None) """ - if entity.state in ('redirect', 'deleted'): + if entity.state in ("redirect", "deleted"): return dict( - ident = entity.ident, - state = entity.state, + ident=entity.ident, + state=entity.state, ) - elif entity.state != 'active': + elif entity.state != "active": raise ValueError("Unhandled entity state: {}".format(entity.state)) # First, the easy ones (direct copy) t = dict( - doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", - ident = entity.ident, - state = entity.state, - revision = entity.revision, - - name = entity.name, - publisher = entity.publisher, - container_type = entity.container_type, - publication_status= entity.publication_status, - issnl = entity.issnl, - issne = entity.issne, - issnp = entity.issnp, - wikidata_qid = entity.wikidata_qid, + doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z", + ident=entity.ident, + state=entity.state, + revision=entity.revision, + name=entity.name, + publisher=entity.publisher, + container_type=entity.container_type, + publication_status=entity.publication_status, + issnl=entity.issnl, + issne=entity.issne, + issnp=entity.issnp, + wikidata_qid=entity.wikidata_qid, ) if not entity.extra: entity.extra = dict() - for key in ('country', 'languages', 'mimetypes', 'original_name', - 'first_year', 'last_year', 'aliases', 'abbrev', 'region', - 'discipline', 'publisher_type'): + for key in ( + "country", + "languages", + "mimetypes", + "original_name", + "first_year", + "last_year", + "aliases", + "abbrev", + "region", + "discipline", + "publisher_type", + ): if entity.extra.get(key): t[key] = entity.extra[key] - if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'): - t['dblp_prefix'] = entity.extra['dblp']['prefix'] + if entity.extra.get("dblp") and entity.extra["dblp"].get("prefix"): + t["dblp_prefix"] = entity.extra["dblp"]["prefix"] - if 'country' in t: - t['country_code'] = t.pop('country') + if "country" in t: + t["country_code"] = t.pop("country") - t['issns'] = [entity.issnl, entity.issne, entity.issnp] - for key in ('issnp', 'issne'): + t["issns"] = [entity.issnl, entity.issne, entity.issnp] + for key in ("issnp", "issne"): if entity.extra.get(key): - t['issns'].append(entity.extra[key]) - t['issns'] = list(set([i for i in t['issns'] if i])) + t["issns"].append(entity.extra[key]) + t["issns"] = list(set([i for i in t["issns"] if i])) in_doaj = None in_road = None @@ -459,72 +499,72 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None): keepers = [] extra = entity.extra - if extra.get('doaj'): - if extra['doaj'].get('as_of'): + if extra.get("doaj"): + if extra["doaj"].get("as_of"): in_doaj = True - if extra.get('road'): - if extra['road'].get('as_of'): + if extra.get("road"): + if extra["road"].get("as_of"): in_road = True - if extra.get('szczepanski'): - if extra['szczepanski'].get('as_of'): + if extra.get("szczepanski"): + if extra["szczepanski"].get("as_of"): is_oa = True - if extra.get('default_license'): - if extra['default_license'].startswith('CC-'): + if extra.get("default_license"): + if extra["default_license"].startswith("CC-"): is_oa = True - t['sherpa_romeo_color'] = None - if extra.get('sherpa_romeo'): - t['sherpa_romeo_color'] = extra['sherpa_romeo'].get('color') - if extra['sherpa_romeo'].get('color') == 'white': + t["sherpa_romeo_color"] = None + if extra.get("sherpa_romeo"): + t["sherpa_romeo_color"] = extra["sherpa_romeo"].get("color") + if extra["sherpa_romeo"].get("color") == "white": is_oa = False - if extra.get('kbart'): + if extra.get("kbart"): any_kbart = True - if extra['kbart'].get('jstor'): + if extra["kbart"].get("jstor"): any_jstor = True - for k, v in extra['kbart'].items(): + for k, v in extra["kbart"].items(): if v and isinstance(v, dict): keepers.append(k) - if extra.get('ia'): - if extra['ia'].get('sim'): + if extra.get("ia"): + if extra["ia"].get("sim"): any_ia_sim = True - if extra['ia'].get('longtail_oa'): + if extra["ia"].get("longtail_oa"): is_longtail_oa = True - t['is_superceded'] = bool(extra.get('superceded')) + t["is_superceded"] = bool(extra.get("superceded")) - t['keepers'] = keepers - t['in_doaj'] = bool(in_doaj) - t['in_road'] = bool(in_road) - t['any_kbart'] = bool(any_kbart) + t["keepers"] = keepers + t["in_doaj"] = bool(in_doaj) + t["in_road"] = bool(in_road) + t["any_kbart"] = bool(any_kbart) if force_bool: - t['is_oa'] = bool(in_doaj or in_road or is_oa) - t['is_longtail_oa'] = bool(is_longtail_oa) - t['any_jstor'] = bool(any_jstor) - t['any_ia_sim'] = bool(any_ia_sim) + t["is_oa"] = bool(in_doaj or in_road or is_oa) + t["is_longtail_oa"] = bool(is_longtail_oa) + t["any_jstor"] = bool(any_jstor) + t["any_ia_sim"] = bool(any_ia_sim) else: - t['is_oa'] = in_doaj or in_road or is_oa - t['is_longtail_oa'] = is_longtail_oa - t['any_jstor'] = any_jstor - t['any_ia_sim'] = any_ia_sim + t["is_oa"] = in_doaj or in_road or is_oa + t["is_longtail_oa"] = is_longtail_oa + t["any_jstor"] = any_jstor + t["any_ia_sim"] = any_ia_sim # mix in stats, if provided if stats: - t['releases_total'] = stats['total'] - t['preservation_bright'] = stats['preservation']['bright'] - t['preservation_dark'] = stats['preservation']['dark'] - t['preservation_shadows_only'] = stats['preservation']['shadows_only'] - t['preservation_none'] = stats['preservation']['none'] + t["releases_total"] = stats["total"] + t["preservation_bright"] = stats["preservation"]["bright"] + t["preservation_dark"] = stats["preservation"]["dark"] + t["preservation_shadows_only"] = stats["preservation"]["shadows_only"] + t["preservation_none"] = stats["preservation"]["none"] return t def _type_of_edit(edit: EntityEdit) -> str: if edit.revision is None and edit.redirect_ident is None: - return 'delete' + return "delete" elif edit.redirect_ident: # redirect - return 'update' + return "update" elif edit.prev_revision is None and edit.redirect_ident is None and edit.revision: - return 'create' + return "create" else: - return 'update' + return "update" def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]: @@ -536,7 +576,7 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]: editgroup = entity.editgroup t = dict( - doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", + doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z", index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -547,8 +587,8 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]: ) extra = editgroup.extra or dict() - if extra.get('agent'): - t['agent'] = extra['agent'] + if extra.get("agent"): + t["agent"] = extra["agent"] containers = [_type_of_edit(e) for e in editgroup.edits.containers] creators = [_type_of_edit(e) for e in editgroup.edits.creators] @@ -558,27 +598,27 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]: releases = [_type_of_edit(e) for e in editgroup.edits.releases] works = [_type_of_edit(e) for e in editgroup.edits.works] - t['containers'] = len(containers) - t['new_containers'] = len([e for e in containers if e == 'create']) - t['creators'] = len(creators) - t['new_creators'] = len([e for e in creators if e == 'create']) - t['files'] = len(files) - t['new_files'] = len([e for e in files if e == 'create']) - t['filesets'] = len(filesets) - t['new_filesets'] = len([e for e in filesets if e == 'create']) - t['webcaptures'] = len(webcaptures) - t['new_webcaptures'] = len([e for e in webcaptures if e == 'create']) - t['releases'] = len(releases) - t['new_releases'] = len([e for e in releases if e == 'create']) - t['works'] = len(works) - t['new_works'] = len([e for e in works if e == 'create']) + t["containers"] = len(containers) + t["new_containers"] = len([e for e in containers if e == "create"]) + t["creators"] = len(creators) + t["new_creators"] = len([e for e in creators if e == "create"]) + t["files"] = len(files) + t["new_files"] = len([e for e in files if e == "create"]) + t["filesets"] = len(filesets) + t["new_filesets"] = len([e for e in filesets if e == "create"]) + t["webcaptures"] = len(webcaptures) + t["new_webcaptures"] = len([e for e in webcaptures if e == "create"]) + t["releases"] = len(releases) + t["new_releases"] = len([e for e in releases if e == "create"]) + t["works"] = len(works) + t["new_works"] = len([e for e in works if e == "create"]) all_edits = containers + creators + files + filesets + webcaptures + releases + works - t['created'] = len([e for e in all_edits if e == 'create']) - t['updated'] = len([e for e in all_edits if e == 'update']) - t['deleted'] = len([e for e in all_edits if e == 'delete']) - t['total'] = len(all_edits) + t["created"] = len([e for e in all_edits if e == "create"]) + t["updated"] = len([e for e in all_edits if e == "update"]) + t["deleted"] = len([e for e in all_edits if e == "delete"]) + t["total"] = len(all_edits) return t @@ -590,47 +630,47 @@ def file_to_elasticsearch(entity: FileEntity) -> Dict[str, Any]: Raises exception on error (never returns None) """ - if entity.state in ('redirect', 'deleted'): + if entity.state in ("redirect", "deleted"): return dict( - ident = entity.ident, - state = entity.state, + ident=entity.ident, + state=entity.state, ) - elif entity.state != 'active': + elif entity.state != "active": raise ValueError("Unhandled entity state: {}".format(entity.state)) # First, the easy ones (direct copy) t = dict( - doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", - ident = entity.ident, - state = entity.state, - revision = entity.revision, - release_ids = entity.release_ids, - release_count = len(entity.release_ids), - mimetype = entity.mimetype, - size_bytes = entity.size, - sha1 = entity.sha1, - sha256 = entity.sha256, - md5 = entity.md5, + doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z", + ident=entity.ident, + state=entity.state, + revision=entity.revision, + release_ids=entity.release_ids, + release_count=len(entity.release_ids), + mimetype=entity.mimetype, + size_bytes=entity.size, + sha1=entity.sha1, + sha256=entity.sha256, + md5=entity.md5, ) parsed_urls = [tldextract.extract(u.url) for u in entity.urls] - t['hosts'] = list(set(['.'.join([seg for seg in pu if seg]) for pu in parsed_urls])) - t['domains'] = list(set([pu.registered_domain for pu in parsed_urls])) - t['rels'] = list(set([u.rel for u in entity.urls])) + t["hosts"] = list(set([".".join([seg for seg in pu if seg]) for pu in parsed_urls])) + t["domains"] = list(set([pu.registered_domain for pu in parsed_urls])) + t["rels"] = list(set([u.rel for u in entity.urls])) - t['in_ia'] = bool('archive.org' in t['domains']) - t['in_ia_petabox'] = bool('archive.org' in t['hosts']) + t["in_ia"] = bool("archive.org" in t["domains"]) + t["in_ia_petabox"] = bool("archive.org" in t["hosts"]) any_url = None good_url = None best_url = None - for release_url in (entity.urls or []): + for release_url in entity.urls or []: any_url = release_url.url - if release_url.rel in ('webarchive', 'repository'): + if release_url.rel in ("webarchive", "repository"): good_url = release_url.url - if '//web.archive.org/' in release_url.url or '//archive.org/' in release_url.url: + if "//web.archive.org/" in release_url.url or "//archive.org/" in release_url.url: best_url = release_url.url # here is where we bake-in priority; IA-specific - t['best_url'] = best_url or good_url or any_url + t["best_url"] = best_url or good_url or any_url return t diff --git a/python/fatcat_tools/transforms/ingest.py b/python/fatcat_tools/transforms/ingest.py index 9101a4ec..30b5b190 100644 --- a/python/fatcat_tools/transforms/ingest.py +++ b/python/fatcat_tools/transforms/ingest.py @@ -1,4 +1,3 @@ - INGEST_TYPE_CONTAINER_MAP = { # Optica "twtpsm6ytje3nhuqfu3pa7ca7u": "html", @@ -14,7 +13,8 @@ INGEST_TYPE_CONTAINER_MAP = { "lovwr7ladjagzkhmoaszg7efqu": "html", } -def release_ingest_request(release, ingest_request_source='fatcat', ingest_type=None): + +def release_ingest_request(release, ingest_request_source="fatcat", ingest_type=None): """ Takes a full release entity object and returns an ingest request (as dict), or None if it seems like this release shouldn't be ingested. @@ -27,27 +27,35 @@ def release_ingest_request(release, ingest_request_source='fatcat', ingest_type= calling code should check the returned type field. """ - if release.state != 'active': + if release.state != "active": return None if (not ingest_type) and release.container_id: ingest_type = INGEST_TYPE_CONTAINER_MAP.get(release.container_id) if not ingest_type: - if release.release_type == 'stub': + if release.release_type == "stub": return None - elif release.release_type in ['component', 'graphic']: - ingest_type = 'component' - elif release.release_type == 'dataset': - ingest_type = 'dataset' - elif release.release_type == 'software': - ingest_type = 'software' - elif release.release_type == 'post-weblog': - ingest_type = 'html' - elif release.release_type in ['article-journal', 'article', 'chapter', 'paper-conference', 'book', 'report', 'thesis']: - ingest_type = 'pdf' + elif release.release_type in ["component", "graphic"]: + ingest_type = "component" + elif release.release_type == "dataset": + ingest_type = "dataset" + elif release.release_type == "software": + ingest_type = "software" + elif release.release_type == "post-weblog": + ingest_type = "html" + elif release.release_type in [ + "article-journal", + "article", + "chapter", + "paper-conference", + "book", + "report", + "thesis", + ]: + ingest_type = "pdf" else: - ingest_type = 'pdf' + ingest_type = "pdf" # generate a URL where we expect to find fulltext url = None @@ -59,8 +67,10 @@ def release_ingest_request(release, ingest_request_source='fatcat', ingest_type= link_source_id = release.ext_ids.arxiv elif release.ext_ids.pmcid and ingest_type == "pdf": # TODO: how to tell if an author manuscript in PMC vs. published? - #url = "https://www.ncbi.nlm.nih.gov/pmc/articles/{}/pdf/".format(release.ext_ids.pmcid) - url = "http://europepmc.org/backend/ptpmcrender.fcgi?accid={}&blobtype=pdf".format(release.ext_ids.pmcid) + # url = "https://www.ncbi.nlm.nih.gov/pmc/articles/{}/pdf/".format(release.ext_ids.pmcid) + url = "http://europepmc.org/backend/ptpmcrender.fcgi?accid={}&blobtype=pdf".format( + release.ext_ids.pmcid + ) link_source = "pmc" link_source_id = release.ext_ids.pmcid elif release.ext_ids.doi: @@ -75,19 +85,19 @@ def release_ingest_request(release, ingest_request_source='fatcat', ingest_type= ext_ids = dict([(k, v) for (k, v) in ext_ids.items() if v]) ingest_request = { - 'ingest_type': ingest_type, - 'ingest_request_source': ingest_request_source, - 'base_url': url, - 'release_stage': release.release_stage, - 'fatcat': { - 'release_ident': release.ident, - 'work_ident': release.work_id, + "ingest_type": ingest_type, + "ingest_request_source": ingest_request_source, + "base_url": url, + "release_stage": release.release_stage, + "fatcat": { + "release_ident": release.ident, + "work_ident": release.work_id, }, - 'ext_ids': ext_ids, + "ext_ids": ext_ids, } if link_source and link_source_id: - ingest_request['link_source'] = link_source - ingest_request['link_source_id'] = link_source_id + ingest_request["link_source"] = link_source + ingest_request["link_source_id"] = link_source_id return ingest_request |