summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/transforms
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/transforms')
-rw-r--r--python/fatcat_tools/transforms/access.py44
-rw-r--r--python/fatcat_tools/transforms/csl.py185
-rw-r--r--python/fatcat_tools/transforms/elasticsearch.py668
-rw-r--r--python/fatcat_tools/transforms/ingest.py64
4 files changed, 517 insertions, 444 deletions
diff --git a/python/fatcat_tools/transforms/access.py b/python/fatcat_tools/transforms/access.py
index ae9880e7..34212a6a 100644
--- a/python/fatcat_tools/transforms/access.py
+++ b/python/fatcat_tools/transforms/access.py
@@ -1,4 +1,3 @@
-
from enum import Enum
from typing import List, Optional
@@ -16,6 +15,7 @@ class AccessType(str, Enum):
openlibrary = "openlibrary"
wikipedia = "wikipedia"
+
class AccessOption(BaseModel):
access_type: AccessType
@@ -40,27 +40,31 @@ def release_access_options(release: ReleaseEntity) -> List[AccessOption]:
option found
"""
options = []
- for f in (release.files or []):
+ for f in release.files or []:
thumbnail_url = None
- if f.mimetype == 'application/pdf' and f.sha1 and f.urls:
+ if f.mimetype == "application/pdf" and f.sha1 and f.urls:
# NOTE: scholar.archive.org does an actual database check before
# generating these URLs, but we skip that for speed
thumbnail_url = f"https://blobs.fatcat.wiki/thumbnail/pdf/{f.sha1[0:2]}/{f.sha1[2:4]}/{f.sha1}.180px.jpg"
- for u in (f.urls or []):
- if '://web.archive.org/' in u.url:
- return [AccessOption(
- access_type="wayback",
- access_url=u.url,
- mimetype=f.mimetype,
- size_bytes=f.size,
- thumbnail_url=thumbnail_url,
- )]
- elif '://archive.org/' in u.url:
- return [AccessOption(
- access_type="ia_file",
- access_url=u.url,
- mimetype=f.mimetype,
- size_bytes=f.size,
- thumbnail_url=thumbnail_url,
- )]
+ for u in f.urls or []:
+ if "://web.archive.org/" in u.url:
+ return [
+ AccessOption(
+ access_type="wayback",
+ access_url=u.url,
+ mimetype=f.mimetype,
+ size_bytes=f.size,
+ thumbnail_url=thumbnail_url,
+ )
+ ]
+ elif "://archive.org/" in u.url:
+ return [
+ AccessOption(
+ access_type="ia_file",
+ access_url=u.url,
+ mimetype=f.mimetype,
+ size_bytes=f.size,
+ thumbnail_url=thumbnail_url,
+ )
+ ]
return options
diff --git a/python/fatcat_tools/transforms/csl.py b/python/fatcat_tools/transforms/csl.py
index f8b26bce..2b39068a 100644
--- a/python/fatcat_tools/transforms/csl.py
+++ b/python/fatcat_tools/transforms/csl.py
@@ -1,4 +1,3 @@
-
import json
from citeproc import (
@@ -13,10 +12,10 @@ from citeproc_styles import get_style_filepath
def contribs_by_role(contribs, role):
- ret = [c.copy() for c in contribs if c['role'] == role]
- [c.pop('role') for c in ret]
+ ret = [c.copy() for c in contribs if c["role"] == role]
+ [c.pop("role") for c in ret]
# TODO: some note to self here
- [c.pop('literal') for c in ret if 'literal' in c]
+ [c.pop("literal") for c in ret if "literal" in c]
if not ret:
return None
else:
@@ -33,26 +32,30 @@ def release_to_csl(entity):
Follows, but not enforced by: https://github.com/citation-style-language/schema/blob/master/csl-data.json
"""
contribs = []
- for contrib in (entity.contribs or []):
+ for contrib in entity.contribs or []:
if contrib.creator:
# Default to "local" (publication-specific) metadata; fall back to
# creator-level
- family = contrib.creator.surname or contrib.surname or (contrib.raw_name and contrib.raw_name.split()[-1])
+ family = (
+ contrib.creator.surname
+ or contrib.surname
+ or (contrib.raw_name and contrib.raw_name.split()[-1])
+ )
if not family:
# CSL requires some surname (family name)
continue
c = dict(
family=family,
given=contrib.creator.given_name or contrib.given_name,
- #dropping-particle
- #non-dropping-particle
- #suffix
- #comma-suffix
- #static-ordering
+ # dropping-particle
+ # non-dropping-particle
+ # suffix
+ # comma-suffix
+ # static-ordering
literal=contrib.creator.display_name or contrib.raw_name,
- #parse-names,
+ # parse-names,
# role must be defined; default to author
- role=contrib.role or 'author',
+ role=contrib.role or "author",
)
else:
family = contrib.surname or (contrib.raw_name and contrib.raw_name.split()[-1])
@@ -64,7 +67,7 @@ def release_to_csl(entity):
given=contrib.given_name,
literal=contrib.raw_name,
# role must be defined; default to author
- role=contrib.role or 'author',
+ role=contrib.role or "author",
)
for k in list(c.keys()):
if not c[k]:
@@ -78,93 +81,108 @@ def release_to_csl(entity):
issued_date = None
if entity.release_date:
- issued_date = {"date-parts": [[
- entity.release_date.year,
- entity.release_date.month,
- entity.release_date.day,
- ]]}
+ issued_date = {
+ "date-parts": [
+ [
+ entity.release_date.year,
+ entity.release_date.month,
+ entity.release_date.day,
+ ]
+ ]
+ }
elif entity.release_year:
issued_date = {"date-parts": [[entity.release_year]]}
csl = dict(
- #id,
- #categories
- type=entity.release_type or "article", # can't be blank
+ # id,
+ # categories
+ type=entity.release_type or "article", # can't be blank
language=entity.language,
- #journalAbbreviation
- #shortTitle
+ # journalAbbreviation
+ # shortTitle
## see below for all contrib roles
- #accessed
- #container
- #event-date
+ # accessed
+ # container
+ # event-date
issued=issued_date,
- #original-date
- #submitted
+ # original-date
+ # submitted
abstract=abstract,
- #annote
- #archive
- #archive_location
- #archive-place
- #authority
- #call-number
- #chapter-number
- #citation-number
- #citation-label
- #collection-number
- #collection-title
+ # annote
+ # archive
+ # archive_location
+ # archive-place
+ # authority
+ # call-number
+ # chapter-number
+ # citation-number
+ # citation-label
+ # collection-number
+ # collection-title
container_title=entity.container and entity.container.name,
- #container-title-short
- #dimensions
+ # container-title-short
+ # dimensions
DOI=entity.ext_ids.doi,
- #edition
- #event
- #event-place
- #first-reference-note-number
- #genre
+ # edition
+ # event
+ # event-place
+ # first-reference-note-number
+ # genre
ISBN=entity.ext_ids.isbn13,
ISSN=entity.container and entity.container.issnl,
issue=entity.issue,
- #jurisdiction
- #keyword
- #locator
- #medium
- #note
- #number
- #number-of-pages
- #number-of-volumes
- #original-publisher
- #original-publisher-place
- #original-title
+ # jurisdiction
+ # keyword
+ # locator
+ # medium
+ # note
+ # number
+ # number-of-pages
+ # number-of-volumes
+ # original-publisher
+ # original-publisher-place
+ # original-title
# TODO: page=entity.pages,
- page_first=entity.pages and entity.pages.split('-')[0],
+ page_first=entity.pages and entity.pages.split("-")[0],
PMCID=entity.ext_ids.pmcid,
PMID=entity.ext_ids.pmid,
publisher=(entity.container and entity.container.publisher) or entity.publisher,
- #publisher-place
- #references
- #reviewed-title
- #scale
- #section
- #source
- #status
+ # publisher-place
+ # references
+ # reviewed-title
+ # scale
+ # section
+ # source
+ # status
title=entity.title,
- #title-short
- #URL
- #version
+ # title-short
+ # URL
+ # version
volume=entity.volume,
- #year-suffix
+ # year-suffix
)
- for role in ['author', 'collection-editor', 'composer', 'container-author',
- 'director', 'editor', 'editorial-director', 'interviewer',
- 'illustrator', 'original-author', 'recipient', 'reviewed-author',
- 'translator']:
+ for role in [
+ "author",
+ "collection-editor",
+ "composer",
+ "container-author",
+ "director",
+ "editor",
+ "editorial-director",
+ "interviewer",
+ "illustrator",
+ "original-author",
+ "recipient",
+ "reviewed-author",
+ "translator",
+ ]:
cbr = contribs_by_role(contribs, role)
if cbr:
csl[role] = cbr
# underline-to-dash
- csl['container-title'] = csl.pop('container_title')
- csl['page-first'] = csl.pop('page_first')
- empty_keys = [k for k,v in csl.items() if not v]
+ csl["container-title"] = csl.pop("container_title")
+ csl["page-first"] = csl.pop("page_first")
+ empty_keys = [k for k, v in csl.items() if not v]
for k in empty_keys:
csl.pop(k)
return csl
@@ -184,10 +202,11 @@ def refs_to_csl(entity):
title=ref.title,
issued=issued_date,
)
- csl['id'] = ref.key or ref.index, # zero- or one-indexed?
+ csl["id"] = (ref.key or ref.index,) # zero- or one-indexed?
ret.append(csl)
return ret
+
def citeproc_csl(csl_json, style, html=False):
"""
Renders a release entity to a styled citation.
@@ -200,8 +219,8 @@ def citeproc_csl(csl_json, style, html=False):
Returns a string; if the html flag is set, and the style isn't 'csl-json'
or 'bibtex', it will be HTML. Otherwise plain text.
"""
- if not csl_json.get('id'):
- csl_json['id'] = "unknown"
+ if not csl_json.get("id"):
+ csl_json["id"] = "unknown"
if style == "csl-json":
return json.dumps(csl_json)
bib_src = CiteProcJSON([csl_json])
@@ -211,7 +230,7 @@ def citeproc_csl(csl_json, style, html=False):
style_path = get_style_filepath(style)
bib_style = CitationStylesStyle(style_path, validate=False)
bib = CitationStylesBibliography(bib_style, bib_src, form)
- bib.register(Citation([CitationItem(csl_json['id'])]))
+ bib.register(Citation([CitationItem(csl_json["id"])]))
lines = bib.bibliography()[0]
if style == "bibtex":
out = ""
@@ -222,6 +241,6 @@ def citeproc_csl(csl_json, style, html=False):
out += "\n " + line
else:
out += line
- return ''.join(out)
+ return "".join(out)
else:
- return ''.join(lines)
+ return "".join(lines)
diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py
index 1826d4eb..e39e9ea4 100644
--- a/python/fatcat_tools/transforms/elasticsearch.py
+++ b/python/fatcat_tools/transforms/elasticsearch.py
@@ -1,4 +1,3 @@
-
import datetime
from typing import Any, Dict, Optional
@@ -13,13 +12,14 @@ from fatcat_openapi_client import (
def check_kbart(year: int, archive: dict) -> Optional[bool]:
- if not archive or not archive.get('year_spans'):
+ if not archive or not archive.get("year_spans"):
return None
- for span in archive['year_spans']:
+ for span in archive["year_spans"]:
if year >= span[0] and year <= span[1]:
return True
return False
+
def test_check_kbart() -> None:
assert check_kbart(1990, dict()) is None
@@ -40,87 +40,89 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) ->
Raises exception on error (never returns None)
"""
- if entity.state in ('redirect', 'deleted'):
+ if entity.state in ("redirect", "deleted"):
return dict(
- ident = entity.ident,
- state = entity.state,
+ ident=entity.ident,
+ state=entity.state,
)
- elif entity.state != 'active':
+ elif entity.state != "active":
raise ValueError("Unhandled entity state: {}".format(entity.state))
# First, the easy ones (direct copy)
release = entity
t: Dict[str, Any] = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
- ident = release.ident,
- state = release.state,
- revision = release.revision,
- work_id = release.work_id,
- title = release.title,
- subtitle = release.subtitle,
- original_title = release.original_title,
- release_type = release.release_type,
- release_stage = release.release_stage,
- withdrawn_status = release.withdrawn_status,
- language = release.language,
- volume = release.volume,
- issue = release.issue,
- pages = release.pages,
- number = release.number,
- license = release.license_slug,
- version = release.version,
- doi = release.ext_ids.doi,
- pmid = release.ext_ids.pmid,
- pmcid = release.ext_ids.pmcid,
- isbn13 = release.ext_ids.isbn13,
- wikidata_qid = release.ext_ids.wikidata_qid,
- core_id = release.ext_ids.core,
- arxiv_id = release.ext_ids.arxiv,
- jstor_id = release.ext_ids.jstor,
- ark_id = release.ext_ids.ark,
- mag_id = release.ext_ids.mag,
- dblp_id = release.ext_ids.dblp,
- doaj_id = release.ext_ids.doaj,
- hdl = release.ext_ids.hdl,
- tags = [],
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
+ ident=release.ident,
+ state=release.state,
+ revision=release.revision,
+ work_id=release.work_id,
+ title=release.title,
+ subtitle=release.subtitle,
+ original_title=release.original_title,
+ release_type=release.release_type,
+ release_stage=release.release_stage,
+ withdrawn_status=release.withdrawn_status,
+ language=release.language,
+ volume=release.volume,
+ issue=release.issue,
+ pages=release.pages,
+ number=release.number,
+ license=release.license_slug,
+ version=release.version,
+ doi=release.ext_ids.doi,
+ pmid=release.ext_ids.pmid,
+ pmcid=release.ext_ids.pmcid,
+ isbn13=release.ext_ids.isbn13,
+ wikidata_qid=release.ext_ids.wikidata_qid,
+ core_id=release.ext_ids.core,
+ arxiv_id=release.ext_ids.arxiv,
+ jstor_id=release.ext_ids.jstor,
+ ark_id=release.ext_ids.ark,
+ mag_id=release.ext_ids.mag,
+ dblp_id=release.ext_ids.dblp,
+ doaj_id=release.ext_ids.doaj,
+ hdl=release.ext_ids.hdl,
+ tags=[],
)
- t.update(dict(
- is_oa = None,
- is_longtail_oa = None,
- is_preserved = None,
- in_web = False,
- in_dweb = False,
- in_ia = False,
- in_ia_sim = False,
- in_kbart = None,
- in_jstor = False,
- in_doaj= bool(release.ext_ids.doaj),
- in_shadows = False,
- ))
+ t.update(
+ dict(
+ is_oa=None,
+ is_longtail_oa=None,
+ is_preserved=None,
+ in_web=False,
+ in_dweb=False,
+ in_ia=False,
+ in_ia_sim=False,
+ in_kbart=None,
+ in_jstor=False,
+ in_doaj=bool(release.ext_ids.doaj),
+ in_shadows=False,
+ )
+ )
release_year = release.release_year
if release.release_date:
# .isoformat() results in, eg, '2010-10-22' (YYYY-MM-DD)
- t['release_date'] = release.release_date.isoformat()
+ t["release_date"] = release.release_date.isoformat()
if not release_year:
release_year = release.release_date.year
if release_year:
- t['release_year'] = release_year
+ t["release_year"] = release_year
- t['any_abstract'] = len(release.abstracts or []) > 0
- t['ref_count'] = len(release.refs or [])
+ t["any_abstract"] = len(release.abstracts or []) > 0
+ t["ref_count"] = len(release.refs or [])
ref_release_ids = []
- for r in (release.refs or []):
+ for r in release.refs or []:
if r.target_release_id:
ref_release_ids.append(r.target_release_id)
- t['ref_release_ids'] = ref_release_ids
- t['ref_linked_count'] = len(ref_release_ids)
- t['contrib_count'] = len(release.contribs or [])
+ t["ref_release_ids"] = ref_release_ids
+ t["ref_linked_count"] = len(ref_release_ids)
+ t["contrib_count"] = len(release.contribs or [])
contrib_names = []
contrib_affiliations = []
creator_ids = []
- for c in (release.contribs or []):
+ for c in release.contribs or []:
if c.creator and c.creator.display_name:
contrib_names.append(c.creator.display_name)
elif c.raw_name:
@@ -132,193 +134,218 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) ->
creator_ids.append(c.creator_id)
if c.raw_affiliation:
contrib_affiliations.append(c.raw_affiliation)
- t['contrib_names'] = contrib_names
- t['creator_ids'] = creator_ids
- t['affiliations'] = contrib_affiliations
+ t["contrib_names"] = contrib_names
+ t["creator_ids"] = creator_ids
+ t["affiliations"] = contrib_affiliations
# TODO: mapping... probably by lookup?
- t['affiliation_rors'] = None
+ t["affiliation_rors"] = None
if release.container:
t.update(_rte_container_helper(release.container, release_year))
# fall back to release-level container metadata if container not linked or
# missing context
- if not t.get('publisher'):
- t['publisher'] = release.publisher
- if not t.get('container_name') and release.extra:
- t['container_name'] = release.extra.get('container_name')
+ if not t.get("publisher"):
+ t["publisher"] = release.publisher
+ if not t.get("container_name") and release.extra:
+ t["container_name"] = release.extra.get("container_name")
- if release.ext_ids.jstor or (release.ext_ids.doi and release.ext_ids.doi.startswith('10.2307/')):
- t['in_jstor'] = True
+ if release.ext_ids.jstor or (
+ release.ext_ids.doi and release.ext_ids.doi.startswith("10.2307/")
+ ):
+ t["in_jstor"] = True
# transform file/fileset/webcapture related fields
t.update(_rte_content_helper(release))
if release.ext_ids.doaj:
- t['is_oa'] = True
+ t["is_oa"] = True
if release.license_slug:
# TODO: more/better checks here, particularly strict *not* OA licenses
if release.license_slug.startswith("CC-"):
- t['is_oa'] = True
+ t["is_oa"] = True
if release.license_slug.startswith("ARXIV-"):
- t['is_oa'] = True
+ t["is_oa"] = True
- t['is_work_alias'] = None
+ t["is_work_alias"] = None
extra = release.extra or dict()
if extra:
- if extra.get('is_oa'):
+ if extra.get("is_oa"):
# NOTE: not actually setting this anywhere... but could
- t['is_oa'] = True
- if extra.get('is_work_alias') is not None:
- t['is_work_alias'] = bool(extra.get('is_work_alias'))
- if extra.get('longtail_oa'):
+ t["is_oa"] = True
+ if extra.get("is_work_alias") is not None:
+ t["is_work_alias"] = bool(extra.get("is_work_alias"))
+ if extra.get("longtail_oa"):
# sometimes set by GROBID/matcher
- t['is_oa'] = True
- t['is_longtail_oa'] = True
- if not t.get('container_name'):
- t['container_name'] = extra.get('container_name')
- if extra.get('crossref'):
- if extra['crossref'].get('archive'):
+ t["is_oa"] = True
+ t["is_longtail_oa"] = True
+ if not t.get("container_name"):
+ t["container_name"] = extra.get("container_name")
+ if extra.get("crossref"):
+ if extra["crossref"].get("archive"):
# all crossref archives are KBART, I believe
- t['in_kbart'] = True
+ t["in_kbart"] = True
# backwards compatible subtitle fetching
- if not t['subtitle'] and extra.get('subtitle'):
- if type(extra['subtitle']) == list:
- t['subtitle'] = extra['subtitle'][0]
+ if not t["subtitle"] and extra.get("subtitle"):
+ if type(extra["subtitle"]) == list:
+ t["subtitle"] = extra["subtitle"][0]
else:
- t['subtitle'] = extra['subtitle']
+ t["subtitle"] = extra["subtitle"]
- t['first_page'] = None
+ t["first_page"] = None
if release.pages:
- first = release.pages.split('-')[0]
- first = first.replace('p', '')
+ first = release.pages.split("-")[0]
+ first = first.replace("p", "")
if first.isdigit():
- t['first_page'] = first
+ t["first_page"] = first
# TODO: non-numerical first pages
- t['ia_microfilm_url'] = None
- if t['in_ia_sim']:
+ t["ia_microfilm_url"] = None
+ if t["in_ia_sim"]:
# TODO: determine URL somehow? I think this is in flux. Will probably
# need extra metadata in the container extra field.
# special case as a demo for now.
- if release.container_id == "hl5g6d5msjcl7hlbyyvcsbhc2u" \
- and release.release_year in (2011, 2013) \
- and release.issue \
- and release.issue.isdigit() \
- and t['first_page']:
- t['ia_microfilm_url'] = "https://archive.org/details/sim_bjog_{}-{:02d}/page/n{}".format(
+ if (
+ release.container_id == "hl5g6d5msjcl7hlbyyvcsbhc2u"
+ and release.release_year in (2011, 2013)
+ and release.issue
+ and release.issue.isdigit()
+ and t["first_page"]
+ ):
+ t[
+ "ia_microfilm_url"
+ ] = "https://archive.org/details/sim_bjog_{}-{:02d}/page/n{}".format(
release.release_year,
int(release.issue) - 1,
- t['first_page'],
+ t["first_page"],
)
- t['doi_registrar'] = None
- if extra and t['doi']:
- for k in ('crossref', 'datacite', 'jalc'):
+ t["doi_registrar"] = None
+ if extra and t["doi"]:
+ for k in ("crossref", "datacite", "jalc"):
if k in extra:
- t['doi_registrar'] = k
- if 'doi_registrar' not in t:
- t['doi_registrar'] = 'crossref'
+ t["doi_registrar"] = k
+ if "doi_registrar" not in t:
+ t["doi_registrar"] = "crossref"
- if t['doi']:
- t['doi_prefix'] = t['doi'].split('/')[0]
+ if t["doi"]:
+ t["doi_prefix"] = t["doi"].split("/")[0]
- if t['is_longtail_oa']:
- t['is_oa'] = True
+ if t["is_longtail_oa"]:
+ t["is_oa"] = True
# optionally coerce all flags from Optional[bool] to bool
if force_bool:
- for k in ('is_oa', 'is_longtail_oa', 'in_kbart', 'in_ia_sim',
- 'in_jstor', 'in_web', 'in_dweb', 'in_shadows',
- 'is_work_alias'):
+ for k in (
+ "is_oa",
+ "is_longtail_oa",
+ "in_kbart",
+ "in_ia_sim",
+ "in_jstor",
+ "in_web",
+ "in_dweb",
+ "in_shadows",
+ "is_work_alias",
+ ):
t[k] = bool(t[k])
- t['in_ia'] = bool(t['in_ia'])
- t['is_preserved'] = bool(
- t['is_preserved']
- or t['in_ia']
- or t['in_kbart']
- or t['in_jstor']
- or t.get('pmcid')
- or t.get('arxiv_id')
+ t["in_ia"] = bool(t["in_ia"])
+ t["is_preserved"] = bool(
+ t["is_preserved"]
+ or t["in_ia"]
+ or t["in_kbart"]
+ or t["in_jstor"]
+ or t.get("pmcid")
+ or t.get("arxiv_id")
)
- if t['in_ia']:
- t['preservation'] = 'bright'
- elif t['is_preserved']:
- t['preservation'] = 'dark'
- elif t['in_shadows']:
- t['preservation'] = 'shadows_only'
+ if t["in_ia"]:
+ t["preservation"] = "bright"
+ elif t["is_preserved"]:
+ t["preservation"] = "dark"
+ elif t["in_shadows"]:
+ t["preservation"] = "shadows_only"
else:
- t['preservation'] = 'none'
+ t["preservation"] = "none"
return t
+
def _rte_container_helper(container: ContainerEntity, release_year: Optional[int]) -> dict:
"""
Container metadata sub-section of release_to_elasticsearch()
"""
this_year = datetime.date.today().year
t = dict()
- t['publisher'] = container.publisher
- t['container_name'] = container.name
+ t["publisher"] = container.publisher
+ t["container_name"] = container.name
# this is container.ident, not release.container_id, because there may
# be a redirect involved
- t['container_id'] = container.ident
- t['container_issnl'] = container.issnl
+ t["container_id"] = container.ident
+ t["container_issnl"] = container.issnl
issns = [container.issnl, container.issne, container.issnp]
issns = list(set([i for i in issns if i]))
- t['container_issns'] = issns
- t['container_type'] = container.container_type
- t['container_publication_status'] = container.publication_status
+ t["container_issns"] = issns
+ t["container_type"] = container.container_type
+ t["container_publication_status"] = container.publication_status
if container.extra:
c_extra = container.extra
- if c_extra.get('kbart') and release_year:
- if check_kbart(release_year, c_extra['kbart'].get('jstor')):
- t['in_jstor'] = True
- if t.get('in_kbart') or t.get('in_jstor'):
- t['in_kbart'] = True
- for archive in ('portico', 'lockss', 'clockss', 'pkp_pln',
- 'hathitrust', 'scholarsportal', 'cariniana'):
- t['in_kbart'] = t.get('in_kbart') or check_kbart(release_year, c_extra['kbart'].get(archive))
+ if c_extra.get("kbart") and release_year:
+ if check_kbart(release_year, c_extra["kbart"].get("jstor")):
+ t["in_jstor"] = True
+ if t.get("in_kbart") or t.get("in_jstor"):
+ t["in_kbart"] = True
+ for archive in (
+ "portico",
+ "lockss",
+ "clockss",
+ "pkp_pln",
+ "hathitrust",
+ "scholarsportal",
+ "cariniana",
+ ):
+ t["in_kbart"] = t.get("in_kbart") or check_kbart(
+ release_year, c_extra["kbart"].get(archive)
+ )
# recent KBART coverage is often not updated for the
# current year. So for current-year publications, consider
# coverage from *last* year to also be included in the
# Keeper
- if not t.get('in_kbart') and release_year == this_year:
- t['in_kbart'] = check_kbart(this_year - 1, c_extra['kbart'].get(archive))
-
- if c_extra.get('ia'):
- if c_extra['ia'].get('sim') and release_year:
- t['in_ia_sim'] = check_kbart(release_year, c_extra['ia']['sim'])
- if c_extra['ia'].get('longtail_oa'):
- t['is_longtail_oa'] = True
- if c_extra.get('sherpa_romeo'):
- if c_extra['sherpa_romeo'].get('color') == 'white':
- t['is_oa'] = False
- if c_extra.get('default_license') and c_extra.get('default_license').startswith('CC-'):
- t['is_oa'] = True
- if c_extra.get('doaj'):
- if c_extra['doaj'].get('as_of'):
- t['is_oa'] = True
- t['in_doaj'] = True
- if c_extra.get('road'):
- if c_extra['road'].get('as_of'):
- t['is_oa'] = True
- if c_extra.get('szczepanski'):
- if c_extra['szczepanski'].get('as_of'):
- t['is_oa'] = True
- if c_extra.get('country'):
- t['country_code'] = c_extra['country']
- t['country_code_upper'] = c_extra['country'].upper()
- if c_extra.get('publisher_type'):
- t['publisher_type'] = c_extra['publisher_type']
- if c_extra.get('discipline'):
- t['discipline'] = c_extra['discipline']
+ if not t.get("in_kbart") and release_year == this_year:
+ t["in_kbart"] = check_kbart(this_year - 1, c_extra["kbart"].get(archive))
+
+ if c_extra.get("ia"):
+ if c_extra["ia"].get("sim") and release_year:
+ t["in_ia_sim"] = check_kbart(release_year, c_extra["ia"]["sim"])
+ if c_extra["ia"].get("longtail_oa"):
+ t["is_longtail_oa"] = True
+ if c_extra.get("sherpa_romeo"):
+ if c_extra["sherpa_romeo"].get("color") == "white":
+ t["is_oa"] = False
+ if c_extra.get("default_license") and c_extra.get("default_license").startswith("CC-"):
+ t["is_oa"] = True
+ if c_extra.get("doaj"):
+ if c_extra["doaj"].get("as_of"):
+ t["is_oa"] = True
+ t["in_doaj"] = True
+ if c_extra.get("road"):
+ if c_extra["road"].get("as_of"):
+ t["is_oa"] = True
+ if c_extra.get("szczepanski"):
+ if c_extra["szczepanski"].get("as_of"):
+ t["is_oa"] = True
+ if c_extra.get("country"):
+ t["country_code"] = c_extra["country"]
+ t["country_code_upper"] = c_extra["country"].upper()
+ if c_extra.get("publisher_type"):
+ t["publisher_type"] = c_extra["publisher_type"]
+ if c_extra.get("discipline"):
+ t["discipline"] = c_extra["discipline"]
return t
+
def _rte_content_helper(release: ReleaseEntity) -> dict:
"""
File/FileSet/WebCapture sub-section of release_to_elasticsearch()
@@ -329,9 +356,9 @@ def _rte_content_helper(release: ReleaseEntity) -> dict:
- any other URL
"""
t = dict(
- file_count = len(release.files or []),
- fileset_count = len(release.filesets or []),
- webcapture_count = len(release.webcaptures or []),
+ file_count=len(release.files or []),
+ fileset_count=len(release.filesets or []),
+ webcapture_count=len(release.webcaptures or []),
)
any_pdf_url = None
@@ -340,38 +367,42 @@ def _rte_content_helper(release: ReleaseEntity) -> dict:
ia_pdf_url = None
for f in release.files or []:
- if f.extra and f.extra.get('shadows'):
- t['in_shadows'] = True
- is_pdf = 'pdf' in (f.mimetype or '')
- for release_url in (f.urls or []):
+ if f.extra and f.extra.get("shadows"):
+ t["in_shadows"] = True
+ is_pdf = "pdf" in (f.mimetype or "")
+ for release_url in f.urls or []:
# first generic flags
t.update(_rte_url_helper(release_url))
# then PDF specific stuff (for generating "best URL" fields)
- if not f.mimetype and 'pdf' in release_url.url.lower():
+ if not f.mimetype and "pdf" in release_url.url.lower():
is_pdf = True
if is_pdf:
any_pdf_url = release_url.url
- if release_url.rel in ('webarchive', 'repository', 'repo'):
+ if release_url.rel in ("webarchive", "repository", "repo"):
good_pdf_url = release_url.url
- if '//web.archive.org/' in release_url.url or '//archive.org/' in release_url.url:
+ if (
+ "//web.archive.org/" in release_url.url
+ or "//archive.org/" in release_url.url
+ ):
best_pdf_url = release_url.url
ia_pdf_url = release_url.url
# here is where we bake-in PDF url priority; IA-specific
- t['best_pdf_url'] = best_pdf_url or good_pdf_url or any_pdf_url
- t['ia_pdf_url'] = ia_pdf_url
+ t["best_pdf_url"] = best_pdf_url or good_pdf_url or any_pdf_url
+ t["ia_pdf_url"] = ia_pdf_url
for fs in release.filesets or []:
- for url_obj in (fs.urls or []):
+ for url_obj in fs.urls or []:
t.update(_rte_url_helper(url_obj))
for wc in release.webcaptures or []:
- for url_obj in (wc.archive_urls or []):
+ for url_obj in wc.archive_urls or []:
t.update(_rte_url_helper(url_obj))
return t
+
def _rte_url_helper(url_obj) -> dict:
"""
Takes a location URL ('url' and 'rel' keys) and returns generic preservation status.
@@ -382,17 +413,17 @@ def _rte_url_helper(url_obj) -> dict:
these will be iteratively update() into the overal object.
"""
t = dict()
- if url_obj.rel in ('webarchive', 'repository', 'archive', 'repo'):
- t['is_preserved'] = True
- if '//web.archive.org/' in url_obj.url or '//archive.org/' in url_obj.url:
- t['in_ia'] = True
- if url_obj.url.lower().startswith('http') or url_obj.url.lower().startswith('ftp'):
- t['in_web'] = True
- if url_obj.rel in ('dweb', 'p2p', 'ipfs', 'dat', 'torrent'):
+ if url_obj.rel in ("webarchive", "repository", "archive", "repo"):
+ t["is_preserved"] = True
+ if "//web.archive.org/" in url_obj.url or "//archive.org/" in url_obj.url:
+ t["in_ia"] = True
+ if url_obj.url.lower().startswith("http") or url_obj.url.lower().startswith("ftp"):
+ t["in_web"] = True
+ if url_obj.rel in ("dweb", "p2p", "ipfs", "dat", "torrent"):
# not sure what rel will be for this stuff
- t['in_dweb'] = True
- if '//www.jstor.org/' in url_obj.url:
- t['in_jstor'] = True
+ t["in_dweb"] = True
+ if "//www.jstor.org/" in url_obj.url:
+ t["in_jstor"] = True
return t
@@ -404,50 +435,59 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None):
Raises exception on error (never returns None)
"""
- if entity.state in ('redirect', 'deleted'):
+ if entity.state in ("redirect", "deleted"):
return dict(
- ident = entity.ident,
- state = entity.state,
+ ident=entity.ident,
+ state=entity.state,
)
- elif entity.state != 'active':
+ elif entity.state != "active":
raise ValueError("Unhandled entity state: {}".format(entity.state))
# First, the easy ones (direct copy)
t = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
- ident = entity.ident,
- state = entity.state,
- revision = entity.revision,
-
- name = entity.name,
- publisher = entity.publisher,
- container_type = entity.container_type,
- publication_status= entity.publication_status,
- issnl = entity.issnl,
- issne = entity.issne,
- issnp = entity.issnp,
- wikidata_qid = entity.wikidata_qid,
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
+ ident=entity.ident,
+ state=entity.state,
+ revision=entity.revision,
+ name=entity.name,
+ publisher=entity.publisher,
+ container_type=entity.container_type,
+ publication_status=entity.publication_status,
+ issnl=entity.issnl,
+ issne=entity.issne,
+ issnp=entity.issnp,
+ wikidata_qid=entity.wikidata_qid,
)
if not entity.extra:
entity.extra = dict()
- for key in ('country', 'languages', 'mimetypes', 'original_name',
- 'first_year', 'last_year', 'aliases', 'abbrev', 'region',
- 'discipline', 'publisher_type'):
+ for key in (
+ "country",
+ "languages",
+ "mimetypes",
+ "original_name",
+ "first_year",
+ "last_year",
+ "aliases",
+ "abbrev",
+ "region",
+ "discipline",
+ "publisher_type",
+ ):
if entity.extra.get(key):
t[key] = entity.extra[key]
- if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'):
- t['dblp_prefix'] = entity.extra['dblp']['prefix']
+ if entity.extra.get("dblp") and entity.extra["dblp"].get("prefix"):
+ t["dblp_prefix"] = entity.extra["dblp"]["prefix"]
- if 'country' in t:
- t['country_code'] = t.pop('country')
+ if "country" in t:
+ t["country_code"] = t.pop("country")
- t['issns'] = [entity.issnl, entity.issne, entity.issnp]
- for key in ('issnp', 'issne'):
+ t["issns"] = [entity.issnl, entity.issne, entity.issnp]
+ for key in ("issnp", "issne"):
if entity.extra.get(key):
- t['issns'].append(entity.extra[key])
- t['issns'] = list(set([i for i in t['issns'] if i]))
+ t["issns"].append(entity.extra[key])
+ t["issns"] = list(set([i for i in t["issns"] if i]))
in_doaj = None
in_road = None
@@ -459,72 +499,72 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None):
keepers = []
extra = entity.extra
- if extra.get('doaj'):
- if extra['doaj'].get('as_of'):
+ if extra.get("doaj"):
+ if extra["doaj"].get("as_of"):
in_doaj = True
- if extra.get('road'):
- if extra['road'].get('as_of'):
+ if extra.get("road"):
+ if extra["road"].get("as_of"):
in_road = True
- if extra.get('szczepanski'):
- if extra['szczepanski'].get('as_of'):
+ if extra.get("szczepanski"):
+ if extra["szczepanski"].get("as_of"):
is_oa = True
- if extra.get('default_license'):
- if extra['default_license'].startswith('CC-'):
+ if extra.get("default_license"):
+ if extra["default_license"].startswith("CC-"):
is_oa = True
- t['sherpa_romeo_color'] = None
- if extra.get('sherpa_romeo'):
- t['sherpa_romeo_color'] = extra['sherpa_romeo'].get('color')
- if extra['sherpa_romeo'].get('color') == 'white':
+ t["sherpa_romeo_color"] = None
+ if extra.get("sherpa_romeo"):
+ t["sherpa_romeo_color"] = extra["sherpa_romeo"].get("color")
+ if extra["sherpa_romeo"].get("color") == "white":
is_oa = False
- if extra.get('kbart'):
+ if extra.get("kbart"):
any_kbart = True
- if extra['kbart'].get('jstor'):
+ if extra["kbart"].get("jstor"):
any_jstor = True
- for k, v in extra['kbart'].items():
+ for k, v in extra["kbart"].items():
if v and isinstance(v, dict):
keepers.append(k)
- if extra.get('ia'):
- if extra['ia'].get('sim'):
+ if extra.get("ia"):
+ if extra["ia"].get("sim"):
any_ia_sim = True
- if extra['ia'].get('longtail_oa'):
+ if extra["ia"].get("longtail_oa"):
is_longtail_oa = True
- t['is_superceded'] = bool(extra.get('superceded'))
+ t["is_superceded"] = bool(extra.get("superceded"))
- t['keepers'] = keepers
- t['in_doaj'] = bool(in_doaj)
- t['in_road'] = bool(in_road)
- t['any_kbart'] = bool(any_kbart)
+ t["keepers"] = keepers
+ t["in_doaj"] = bool(in_doaj)
+ t["in_road"] = bool(in_road)
+ t["any_kbart"] = bool(any_kbart)
if force_bool:
- t['is_oa'] = bool(in_doaj or in_road or is_oa)
- t['is_longtail_oa'] = bool(is_longtail_oa)
- t['any_jstor'] = bool(any_jstor)
- t['any_ia_sim'] = bool(any_ia_sim)
+ t["is_oa"] = bool(in_doaj or in_road or is_oa)
+ t["is_longtail_oa"] = bool(is_longtail_oa)
+ t["any_jstor"] = bool(any_jstor)
+ t["any_ia_sim"] = bool(any_ia_sim)
else:
- t['is_oa'] = in_doaj or in_road or is_oa
- t['is_longtail_oa'] = is_longtail_oa
- t['any_jstor'] = any_jstor
- t['any_ia_sim'] = any_ia_sim
+ t["is_oa"] = in_doaj or in_road or is_oa
+ t["is_longtail_oa"] = is_longtail_oa
+ t["any_jstor"] = any_jstor
+ t["any_ia_sim"] = any_ia_sim
# mix in stats, if provided
if stats:
- t['releases_total'] = stats['total']
- t['preservation_bright'] = stats['preservation']['bright']
- t['preservation_dark'] = stats['preservation']['dark']
- t['preservation_shadows_only'] = stats['preservation']['shadows_only']
- t['preservation_none'] = stats['preservation']['none']
+ t["releases_total"] = stats["total"]
+ t["preservation_bright"] = stats["preservation"]["bright"]
+ t["preservation_dark"] = stats["preservation"]["dark"]
+ t["preservation_shadows_only"] = stats["preservation"]["shadows_only"]
+ t["preservation_none"] = stats["preservation"]["none"]
return t
def _type_of_edit(edit: EntityEdit) -> str:
if edit.revision is None and edit.redirect_ident is None:
- return 'delete'
+ return "delete"
elif edit.redirect_ident:
# redirect
- return 'update'
+ return "update"
elif edit.prev_revision is None and edit.redirect_ident is None and edit.revision:
- return 'create'
+ return "create"
else:
- return 'update'
+ return "update"
def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
@@ -536,7 +576,7 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
editgroup = entity.editgroup
t = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
index=entity.index,
editgroup_id=entity.editgroup_id,
timestamp=entity.timestamp.isoformat(),
@@ -547,8 +587,8 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
)
extra = editgroup.extra or dict()
- if extra.get('agent'):
- t['agent'] = extra['agent']
+ if extra.get("agent"):
+ t["agent"] = extra["agent"]
containers = [_type_of_edit(e) for e in editgroup.edits.containers]
creators = [_type_of_edit(e) for e in editgroup.edits.creators]
@@ -558,27 +598,27 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
releases = [_type_of_edit(e) for e in editgroup.edits.releases]
works = [_type_of_edit(e) for e in editgroup.edits.works]
- t['containers'] = len(containers)
- t['new_containers'] = len([e for e in containers if e == 'create'])
- t['creators'] = len(creators)
- t['new_creators'] = len([e for e in creators if e == 'create'])
- t['files'] = len(files)
- t['new_files'] = len([e for e in files if e == 'create'])
- t['filesets'] = len(filesets)
- t['new_filesets'] = len([e for e in filesets if e == 'create'])
- t['webcaptures'] = len(webcaptures)
- t['new_webcaptures'] = len([e for e in webcaptures if e == 'create'])
- t['releases'] = len(releases)
- t['new_releases'] = len([e for e in releases if e == 'create'])
- t['works'] = len(works)
- t['new_works'] = len([e for e in works if e == 'create'])
+ t["containers"] = len(containers)
+ t["new_containers"] = len([e for e in containers if e == "create"])
+ t["creators"] = len(creators)
+ t["new_creators"] = len([e for e in creators if e == "create"])
+ t["files"] = len(files)
+ t["new_files"] = len([e for e in files if e == "create"])
+ t["filesets"] = len(filesets)
+ t["new_filesets"] = len([e for e in filesets if e == "create"])
+ t["webcaptures"] = len(webcaptures)
+ t["new_webcaptures"] = len([e for e in webcaptures if e == "create"])
+ t["releases"] = len(releases)
+ t["new_releases"] = len([e for e in releases if e == "create"])
+ t["works"] = len(works)
+ t["new_works"] = len([e for e in works if e == "create"])
all_edits = containers + creators + files + filesets + webcaptures + releases + works
- t['created'] = len([e for e in all_edits if e == 'create'])
- t['updated'] = len([e for e in all_edits if e == 'update'])
- t['deleted'] = len([e for e in all_edits if e == 'delete'])
- t['total'] = len(all_edits)
+ t["created"] = len([e for e in all_edits if e == "create"])
+ t["updated"] = len([e for e in all_edits if e == "update"])
+ t["deleted"] = len([e for e in all_edits if e == "delete"])
+ t["total"] = len(all_edits)
return t
@@ -590,47 +630,47 @@ def file_to_elasticsearch(entity: FileEntity) -> Dict[str, Any]:
Raises exception on error (never returns None)
"""
- if entity.state in ('redirect', 'deleted'):
+ if entity.state in ("redirect", "deleted"):
return dict(
- ident = entity.ident,
- state = entity.state,
+ ident=entity.ident,
+ state=entity.state,
)
- elif entity.state != 'active':
+ elif entity.state != "active":
raise ValueError("Unhandled entity state: {}".format(entity.state))
# First, the easy ones (direct copy)
t = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
- ident = entity.ident,
- state = entity.state,
- revision = entity.revision,
- release_ids = entity.release_ids,
- release_count = len(entity.release_ids),
- mimetype = entity.mimetype,
- size_bytes = entity.size,
- sha1 = entity.sha1,
- sha256 = entity.sha256,
- md5 = entity.md5,
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
+ ident=entity.ident,
+ state=entity.state,
+ revision=entity.revision,
+ release_ids=entity.release_ids,
+ release_count=len(entity.release_ids),
+ mimetype=entity.mimetype,
+ size_bytes=entity.size,
+ sha1=entity.sha1,
+ sha256=entity.sha256,
+ md5=entity.md5,
)
parsed_urls = [tldextract.extract(u.url) for u in entity.urls]
- t['hosts'] = list(set(['.'.join([seg for seg in pu if seg]) for pu in parsed_urls]))
- t['domains'] = list(set([pu.registered_domain for pu in parsed_urls]))
- t['rels'] = list(set([u.rel for u in entity.urls]))
+ t["hosts"] = list(set([".".join([seg for seg in pu if seg]) for pu in parsed_urls]))
+ t["domains"] = list(set([pu.registered_domain for pu in parsed_urls]))
+ t["rels"] = list(set([u.rel for u in entity.urls]))
- t['in_ia'] = bool('archive.org' in t['domains'])
- t['in_ia_petabox'] = bool('archive.org' in t['hosts'])
+ t["in_ia"] = bool("archive.org" in t["domains"])
+ t["in_ia_petabox"] = bool("archive.org" in t["hosts"])
any_url = None
good_url = None
best_url = None
- for release_url in (entity.urls or []):
+ for release_url in entity.urls or []:
any_url = release_url.url
- if release_url.rel in ('webarchive', 'repository'):
+ if release_url.rel in ("webarchive", "repository"):
good_url = release_url.url
- if '//web.archive.org/' in release_url.url or '//archive.org/' in release_url.url:
+ if "//web.archive.org/" in release_url.url or "//archive.org/" in release_url.url:
best_url = release_url.url
# here is where we bake-in priority; IA-specific
- t['best_url'] = best_url or good_url or any_url
+ t["best_url"] = best_url or good_url or any_url
return t
diff --git a/python/fatcat_tools/transforms/ingest.py b/python/fatcat_tools/transforms/ingest.py
index 9101a4ec..30b5b190 100644
--- a/python/fatcat_tools/transforms/ingest.py
+++ b/python/fatcat_tools/transforms/ingest.py
@@ -1,4 +1,3 @@
-
INGEST_TYPE_CONTAINER_MAP = {
# Optica
"twtpsm6ytje3nhuqfu3pa7ca7u": "html",
@@ -14,7 +13,8 @@ INGEST_TYPE_CONTAINER_MAP = {
"lovwr7ladjagzkhmoaszg7efqu": "html",
}
-def release_ingest_request(release, ingest_request_source='fatcat', ingest_type=None):
+
+def release_ingest_request(release, ingest_request_source="fatcat", ingest_type=None):
"""
Takes a full release entity object and returns an ingest request (as dict),
or None if it seems like this release shouldn't be ingested.
@@ -27,27 +27,35 @@ def release_ingest_request(release, ingest_request_source='fatcat', ingest_type=
calling code should check the returned type field.
"""
- if release.state != 'active':
+ if release.state != "active":
return None
if (not ingest_type) and release.container_id:
ingest_type = INGEST_TYPE_CONTAINER_MAP.get(release.container_id)
if not ingest_type:
- if release.release_type == 'stub':
+ if release.release_type == "stub":
return None
- elif release.release_type in ['component', 'graphic']:
- ingest_type = 'component'
- elif release.release_type == 'dataset':
- ingest_type = 'dataset'
- elif release.release_type == 'software':
- ingest_type = 'software'
- elif release.release_type == 'post-weblog':
- ingest_type = 'html'
- elif release.release_type in ['article-journal', 'article', 'chapter', 'paper-conference', 'book', 'report', 'thesis']:
- ingest_type = 'pdf'
+ elif release.release_type in ["component", "graphic"]:
+ ingest_type = "component"
+ elif release.release_type == "dataset":
+ ingest_type = "dataset"
+ elif release.release_type == "software":
+ ingest_type = "software"
+ elif release.release_type == "post-weblog":
+ ingest_type = "html"
+ elif release.release_type in [
+ "article-journal",
+ "article",
+ "chapter",
+ "paper-conference",
+ "book",
+ "report",
+ "thesis",
+ ]:
+ ingest_type = "pdf"
else:
- ingest_type = 'pdf'
+ ingest_type = "pdf"
# generate a URL where we expect to find fulltext
url = None
@@ -59,8 +67,10 @@ def release_ingest_request(release, ingest_request_source='fatcat', ingest_type=
link_source_id = release.ext_ids.arxiv
elif release.ext_ids.pmcid and ingest_type == "pdf":
# TODO: how to tell if an author manuscript in PMC vs. published?
- #url = "https://www.ncbi.nlm.nih.gov/pmc/articles/{}/pdf/".format(release.ext_ids.pmcid)
- url = "http://europepmc.org/backend/ptpmcrender.fcgi?accid={}&blobtype=pdf".format(release.ext_ids.pmcid)
+ # url = "https://www.ncbi.nlm.nih.gov/pmc/articles/{}/pdf/".format(release.ext_ids.pmcid)
+ url = "http://europepmc.org/backend/ptpmcrender.fcgi?accid={}&blobtype=pdf".format(
+ release.ext_ids.pmcid
+ )
link_source = "pmc"
link_source_id = release.ext_ids.pmcid
elif release.ext_ids.doi:
@@ -75,19 +85,19 @@ def release_ingest_request(release, ingest_request_source='fatcat', ingest_type=
ext_ids = dict([(k, v) for (k, v) in ext_ids.items() if v])
ingest_request = {
- 'ingest_type': ingest_type,
- 'ingest_request_source': ingest_request_source,
- 'base_url': url,
- 'release_stage': release.release_stage,
- 'fatcat': {
- 'release_ident': release.ident,
- 'work_ident': release.work_id,
+ "ingest_type": ingest_type,
+ "ingest_request_source": ingest_request_source,
+ "base_url": url,
+ "release_stage": release.release_stage,
+ "fatcat": {
+ "release_ident": release.ident,
+ "work_ident": release.work_id,
},
- 'ext_ids': ext_ids,
+ "ext_ids": ext_ids,
}
if link_source and link_source_id:
- ingest_request['link_source'] = link_source
- ingest_request['link_source_id'] = link_source_id
+ ingest_request["link_source"] = link_source
+ ingest_request["link_source_id"] = link_source_id
return ingest_request