aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/transforms/elasticsearch.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:59 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:59 -0700
commit31d1a6a713d177990609767d508209ced19ca396 (patch)
treea628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/transforms/elasticsearch.py
parent9dc891b8098542bb089c8c47098b60a8beb76a53 (diff)
downloadfatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz
fatcat-31d1a6a713d177990609767d508209ced19ca396.zip
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/transforms/elasticsearch.py')
-rw-r--r--python/fatcat_tools/transforms/elasticsearch.py668
1 files changed, 354 insertions, 314 deletions
diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py
index 1826d4eb..e39e9ea4 100644
--- a/python/fatcat_tools/transforms/elasticsearch.py
+++ b/python/fatcat_tools/transforms/elasticsearch.py
@@ -1,4 +1,3 @@
-
import datetime
from typing import Any, Dict, Optional
@@ -13,13 +12,14 @@ from fatcat_openapi_client import (
def check_kbart(year: int, archive: dict) -> Optional[bool]:
- if not archive or not archive.get('year_spans'):
+ if not archive or not archive.get("year_spans"):
return None
- for span in archive['year_spans']:
+ for span in archive["year_spans"]:
if year >= span[0] and year <= span[1]:
return True
return False
+
def test_check_kbart() -> None:
assert check_kbart(1990, dict()) is None
@@ -40,87 +40,89 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) ->
Raises exception on error (never returns None)
"""
- if entity.state in ('redirect', 'deleted'):
+ if entity.state in ("redirect", "deleted"):
return dict(
- ident = entity.ident,
- state = entity.state,
+ ident=entity.ident,
+ state=entity.state,
)
- elif entity.state != 'active':
+ elif entity.state != "active":
raise ValueError("Unhandled entity state: {}".format(entity.state))
# First, the easy ones (direct copy)
release = entity
t: Dict[str, Any] = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
- ident = release.ident,
- state = release.state,
- revision = release.revision,
- work_id = release.work_id,
- title = release.title,
- subtitle = release.subtitle,
- original_title = release.original_title,
- release_type = release.release_type,
- release_stage = release.release_stage,
- withdrawn_status = release.withdrawn_status,
- language = release.language,
- volume = release.volume,
- issue = release.issue,
- pages = release.pages,
- number = release.number,
- license = release.license_slug,
- version = release.version,
- doi = release.ext_ids.doi,
- pmid = release.ext_ids.pmid,
- pmcid = release.ext_ids.pmcid,
- isbn13 = release.ext_ids.isbn13,
- wikidata_qid = release.ext_ids.wikidata_qid,
- core_id = release.ext_ids.core,
- arxiv_id = release.ext_ids.arxiv,
- jstor_id = release.ext_ids.jstor,
- ark_id = release.ext_ids.ark,
- mag_id = release.ext_ids.mag,
- dblp_id = release.ext_ids.dblp,
- doaj_id = release.ext_ids.doaj,
- hdl = release.ext_ids.hdl,
- tags = [],
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
+ ident=release.ident,
+ state=release.state,
+ revision=release.revision,
+ work_id=release.work_id,
+ title=release.title,
+ subtitle=release.subtitle,
+ original_title=release.original_title,
+ release_type=release.release_type,
+ release_stage=release.release_stage,
+ withdrawn_status=release.withdrawn_status,
+ language=release.language,
+ volume=release.volume,
+ issue=release.issue,
+ pages=release.pages,
+ number=release.number,
+ license=release.license_slug,
+ version=release.version,
+ doi=release.ext_ids.doi,
+ pmid=release.ext_ids.pmid,
+ pmcid=release.ext_ids.pmcid,
+ isbn13=release.ext_ids.isbn13,
+ wikidata_qid=release.ext_ids.wikidata_qid,
+ core_id=release.ext_ids.core,
+ arxiv_id=release.ext_ids.arxiv,
+ jstor_id=release.ext_ids.jstor,
+ ark_id=release.ext_ids.ark,
+ mag_id=release.ext_ids.mag,
+ dblp_id=release.ext_ids.dblp,
+ doaj_id=release.ext_ids.doaj,
+ hdl=release.ext_ids.hdl,
+ tags=[],
)
- t.update(dict(
- is_oa = None,
- is_longtail_oa = None,
- is_preserved = None,
- in_web = False,
- in_dweb = False,
- in_ia = False,
- in_ia_sim = False,
- in_kbart = None,
- in_jstor = False,
- in_doaj= bool(release.ext_ids.doaj),
- in_shadows = False,
- ))
+ t.update(
+ dict(
+ is_oa=None,
+ is_longtail_oa=None,
+ is_preserved=None,
+ in_web=False,
+ in_dweb=False,
+ in_ia=False,
+ in_ia_sim=False,
+ in_kbart=None,
+ in_jstor=False,
+ in_doaj=bool(release.ext_ids.doaj),
+ in_shadows=False,
+ )
+ )
release_year = release.release_year
if release.release_date:
# .isoformat() results in, eg, '2010-10-22' (YYYY-MM-DD)
- t['release_date'] = release.release_date.isoformat()
+ t["release_date"] = release.release_date.isoformat()
if not release_year:
release_year = release.release_date.year
if release_year:
- t['release_year'] = release_year
+ t["release_year"] = release_year
- t['any_abstract'] = len(release.abstracts or []) > 0
- t['ref_count'] = len(release.refs or [])
+ t["any_abstract"] = len(release.abstracts or []) > 0
+ t["ref_count"] = len(release.refs or [])
ref_release_ids = []
- for r in (release.refs or []):
+ for r in release.refs or []:
if r.target_release_id:
ref_release_ids.append(r.target_release_id)
- t['ref_release_ids'] = ref_release_ids
- t['ref_linked_count'] = len(ref_release_ids)
- t['contrib_count'] = len(release.contribs or [])
+ t["ref_release_ids"] = ref_release_ids
+ t["ref_linked_count"] = len(ref_release_ids)
+ t["contrib_count"] = len(release.contribs or [])
contrib_names = []
contrib_affiliations = []
creator_ids = []
- for c in (release.contribs or []):
+ for c in release.contribs or []:
if c.creator and c.creator.display_name:
contrib_names.append(c.creator.display_name)
elif c.raw_name:
@@ -132,193 +134,218 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) ->
creator_ids.append(c.creator_id)
if c.raw_affiliation:
contrib_affiliations.append(c.raw_affiliation)
- t['contrib_names'] = contrib_names
- t['creator_ids'] = creator_ids
- t['affiliations'] = contrib_affiliations
+ t["contrib_names"] = contrib_names
+ t["creator_ids"] = creator_ids
+ t["affiliations"] = contrib_affiliations
# TODO: mapping... probably by lookup?
- t['affiliation_rors'] = None
+ t["affiliation_rors"] = None
if release.container:
t.update(_rte_container_helper(release.container, release_year))
# fall back to release-level container metadata if container not linked or
# missing context
- if not t.get('publisher'):
- t['publisher'] = release.publisher
- if not t.get('container_name') and release.extra:
- t['container_name'] = release.extra.get('container_name')
+ if not t.get("publisher"):
+ t["publisher"] = release.publisher
+ if not t.get("container_name") and release.extra:
+ t["container_name"] = release.extra.get("container_name")
- if release.ext_ids.jstor or (release.ext_ids.doi and release.ext_ids.doi.startswith('10.2307/')):
- t['in_jstor'] = True
+ if release.ext_ids.jstor or (
+ release.ext_ids.doi and release.ext_ids.doi.startswith("10.2307/")
+ ):
+ t["in_jstor"] = True
# transform file/fileset/webcapture related fields
t.update(_rte_content_helper(release))
if release.ext_ids.doaj:
- t['is_oa'] = True
+ t["is_oa"] = True
if release.license_slug:
# TODO: more/better checks here, particularly strict *not* OA licenses
if release.license_slug.startswith("CC-"):
- t['is_oa'] = True
+ t["is_oa"] = True
if release.license_slug.startswith("ARXIV-"):
- t['is_oa'] = True
+ t["is_oa"] = True
- t['is_work_alias'] = None
+ t["is_work_alias"] = None
extra = release.extra or dict()
if extra:
- if extra.get('is_oa'):
+ if extra.get("is_oa"):
# NOTE: not actually setting this anywhere... but could
- t['is_oa'] = True
- if extra.get('is_work_alias') is not None:
- t['is_work_alias'] = bool(extra.get('is_work_alias'))
- if extra.get('longtail_oa'):
+ t["is_oa"] = True
+ if extra.get("is_work_alias") is not None:
+ t["is_work_alias"] = bool(extra.get("is_work_alias"))
+ if extra.get("longtail_oa"):
# sometimes set by GROBID/matcher
- t['is_oa'] = True
- t['is_longtail_oa'] = True
- if not t.get('container_name'):
- t['container_name'] = extra.get('container_name')
- if extra.get('crossref'):
- if extra['crossref'].get('archive'):
+ t["is_oa"] = True
+ t["is_longtail_oa"] = True
+ if not t.get("container_name"):
+ t["container_name"] = extra.get("container_name")
+ if extra.get("crossref"):
+ if extra["crossref"].get("archive"):
# all crossref archives are KBART, I believe
- t['in_kbart'] = True
+ t["in_kbart"] = True
# backwards compatible subtitle fetching
- if not t['subtitle'] and extra.get('subtitle'):
- if type(extra['subtitle']) == list:
- t['subtitle'] = extra['subtitle'][0]
+ if not t["subtitle"] and extra.get("subtitle"):
+ if type(extra["subtitle"]) == list:
+ t["subtitle"] = extra["subtitle"][0]
else:
- t['subtitle'] = extra['subtitle']
+ t["subtitle"] = extra["subtitle"]
- t['first_page'] = None
+ t["first_page"] = None
if release.pages:
- first = release.pages.split('-')[0]
- first = first.replace('p', '')
+ first = release.pages.split("-")[0]
+ first = first.replace("p", "")
if first.isdigit():
- t['first_page'] = first
+ t["first_page"] = first
# TODO: non-numerical first pages
- t['ia_microfilm_url'] = None
- if t['in_ia_sim']:
+ t["ia_microfilm_url"] = None
+ if t["in_ia_sim"]:
# TODO: determine URL somehow? I think this is in flux. Will probably
# need extra metadata in the container extra field.
# special case as a demo for now.
- if release.container_id == "hl5g6d5msjcl7hlbyyvcsbhc2u" \
- and release.release_year in (2011, 2013) \
- and release.issue \
- and release.issue.isdigit() \
- and t['first_page']:
- t['ia_microfilm_url'] = "https://archive.org/details/sim_bjog_{}-{:02d}/page/n{}".format(
+ if (
+ release.container_id == "hl5g6d5msjcl7hlbyyvcsbhc2u"
+ and release.release_year in (2011, 2013)
+ and release.issue
+ and release.issue.isdigit()
+ and t["first_page"]
+ ):
+ t[
+ "ia_microfilm_url"
+ ] = "https://archive.org/details/sim_bjog_{}-{:02d}/page/n{}".format(
release.release_year,
int(release.issue) - 1,
- t['first_page'],
+ t["first_page"],
)
- t['doi_registrar'] = None
- if extra and t['doi']:
- for k in ('crossref', 'datacite', 'jalc'):
+ t["doi_registrar"] = None
+ if extra and t["doi"]:
+ for k in ("crossref", "datacite", "jalc"):
if k in extra:
- t['doi_registrar'] = k
- if 'doi_registrar' not in t:
- t['doi_registrar'] = 'crossref'
+ t["doi_registrar"] = k
+ if "doi_registrar" not in t:
+ t["doi_registrar"] = "crossref"
- if t['doi']:
- t['doi_prefix'] = t['doi'].split('/')[0]
+ if t["doi"]:
+ t["doi_prefix"] = t["doi"].split("/")[0]
- if t['is_longtail_oa']:
- t['is_oa'] = True
+ if t["is_longtail_oa"]:
+ t["is_oa"] = True
# optionally coerce all flags from Optional[bool] to bool
if force_bool:
- for k in ('is_oa', 'is_longtail_oa', 'in_kbart', 'in_ia_sim',
- 'in_jstor', 'in_web', 'in_dweb', 'in_shadows',
- 'is_work_alias'):
+ for k in (
+ "is_oa",
+ "is_longtail_oa",
+ "in_kbart",
+ "in_ia_sim",
+ "in_jstor",
+ "in_web",
+ "in_dweb",
+ "in_shadows",
+ "is_work_alias",
+ ):
t[k] = bool(t[k])
- t['in_ia'] = bool(t['in_ia'])
- t['is_preserved'] = bool(
- t['is_preserved']
- or t['in_ia']
- or t['in_kbart']
- or t['in_jstor']
- or t.get('pmcid')
- or t.get('arxiv_id')
+ t["in_ia"] = bool(t["in_ia"])
+ t["is_preserved"] = bool(
+ t["is_preserved"]
+ or t["in_ia"]
+ or t["in_kbart"]
+ or t["in_jstor"]
+ or t.get("pmcid")
+ or t.get("arxiv_id")
)
- if t['in_ia']:
- t['preservation'] = 'bright'
- elif t['is_preserved']:
- t['preservation'] = 'dark'
- elif t['in_shadows']:
- t['preservation'] = 'shadows_only'
+ if t["in_ia"]:
+ t["preservation"] = "bright"
+ elif t["is_preserved"]:
+ t["preservation"] = "dark"
+ elif t["in_shadows"]:
+ t["preservation"] = "shadows_only"
else:
- t['preservation'] = 'none'
+ t["preservation"] = "none"
return t
+
def _rte_container_helper(container: ContainerEntity, release_year: Optional[int]) -> dict:
"""
Container metadata sub-section of release_to_elasticsearch()
"""
this_year = datetime.date.today().year
t = dict()
- t['publisher'] = container.publisher
- t['container_name'] = container.name
+ t["publisher"] = container.publisher
+ t["container_name"] = container.name
# this is container.ident, not release.container_id, because there may
# be a redirect involved
- t['container_id'] = container.ident
- t['container_issnl'] = container.issnl
+ t["container_id"] = container.ident
+ t["container_issnl"] = container.issnl
issns = [container.issnl, container.issne, container.issnp]
issns = list(set([i for i in issns if i]))
- t['container_issns'] = issns
- t['container_type'] = container.container_type
- t['container_publication_status'] = container.publication_status
+ t["container_issns"] = issns
+ t["container_type"] = container.container_type
+ t["container_publication_status"] = container.publication_status
if container.extra:
c_extra = container.extra
- if c_extra.get('kbart') and release_year:
- if check_kbart(release_year, c_extra['kbart'].get('jstor')):
- t['in_jstor'] = True
- if t.get('in_kbart') or t.get('in_jstor'):
- t['in_kbart'] = True
- for archive in ('portico', 'lockss', 'clockss', 'pkp_pln',
- 'hathitrust', 'scholarsportal', 'cariniana'):
- t['in_kbart'] = t.get('in_kbart') or check_kbart(release_year, c_extra['kbart'].get(archive))
+ if c_extra.get("kbart") and release_year:
+ if check_kbart(release_year, c_extra["kbart"].get("jstor")):
+ t["in_jstor"] = True
+ if t.get("in_kbart") or t.get("in_jstor"):
+ t["in_kbart"] = True
+ for archive in (
+ "portico",
+ "lockss",
+ "clockss",
+ "pkp_pln",
+ "hathitrust",
+ "scholarsportal",
+ "cariniana",
+ ):
+ t["in_kbart"] = t.get("in_kbart") or check_kbart(
+ release_year, c_extra["kbart"].get(archive)
+ )
# recent KBART coverage is often not updated for the
# current year. So for current-year publications, consider
# coverage from *last* year to also be included in the
# Keeper
- if not t.get('in_kbart') and release_year == this_year:
- t['in_kbart'] = check_kbart(this_year - 1, c_extra['kbart'].get(archive))
-
- if c_extra.get('ia'):
- if c_extra['ia'].get('sim') and release_year:
- t['in_ia_sim'] = check_kbart(release_year, c_extra['ia']['sim'])
- if c_extra['ia'].get('longtail_oa'):
- t['is_longtail_oa'] = True
- if c_extra.get('sherpa_romeo'):
- if c_extra['sherpa_romeo'].get('color') == 'white':
- t['is_oa'] = False
- if c_extra.get('default_license') and c_extra.get('default_license').startswith('CC-'):
- t['is_oa'] = True
- if c_extra.get('doaj'):
- if c_extra['doaj'].get('as_of'):
- t['is_oa'] = True
- t['in_doaj'] = True
- if c_extra.get('road'):
- if c_extra['road'].get('as_of'):
- t['is_oa'] = True
- if c_extra.get('szczepanski'):
- if c_extra['szczepanski'].get('as_of'):
- t['is_oa'] = True
- if c_extra.get('country'):
- t['country_code'] = c_extra['country']
- t['country_code_upper'] = c_extra['country'].upper()
- if c_extra.get('publisher_type'):
- t['publisher_type'] = c_extra['publisher_type']
- if c_extra.get('discipline'):
- t['discipline'] = c_extra['discipline']
+ if not t.get("in_kbart") and release_year == this_year:
+ t["in_kbart"] = check_kbart(this_year - 1, c_extra["kbart"].get(archive))
+
+ if c_extra.get("ia"):
+ if c_extra["ia"].get("sim") and release_year:
+ t["in_ia_sim"] = check_kbart(release_year, c_extra["ia"]["sim"])
+ if c_extra["ia"].get("longtail_oa"):
+ t["is_longtail_oa"] = True
+ if c_extra.get("sherpa_romeo"):
+ if c_extra["sherpa_romeo"].get("color") == "white":
+ t["is_oa"] = False
+ if c_extra.get("default_license") and c_extra.get("default_license").startswith("CC-"):
+ t["is_oa"] = True
+ if c_extra.get("doaj"):
+ if c_extra["doaj"].get("as_of"):
+ t["is_oa"] = True
+ t["in_doaj"] = True
+ if c_extra.get("road"):
+ if c_extra["road"].get("as_of"):
+ t["is_oa"] = True
+ if c_extra.get("szczepanski"):
+ if c_extra["szczepanski"].get("as_of"):
+ t["is_oa"] = True
+ if c_extra.get("country"):
+ t["country_code"] = c_extra["country"]
+ t["country_code_upper"] = c_extra["country"].upper()
+ if c_extra.get("publisher_type"):
+ t["publisher_type"] = c_extra["publisher_type"]
+ if c_extra.get("discipline"):
+ t["discipline"] = c_extra["discipline"]
return t
+
def _rte_content_helper(release: ReleaseEntity) -> dict:
"""
File/FileSet/WebCapture sub-section of release_to_elasticsearch()
@@ -329,9 +356,9 @@ def _rte_content_helper(release: ReleaseEntity) -> dict:
- any other URL
"""
t = dict(
- file_count = len(release.files or []),
- fileset_count = len(release.filesets or []),
- webcapture_count = len(release.webcaptures or []),
+ file_count=len(release.files or []),
+ fileset_count=len(release.filesets or []),
+ webcapture_count=len(release.webcaptures or []),
)
any_pdf_url = None
@@ -340,38 +367,42 @@ def _rte_content_helper(release: ReleaseEntity) -> dict:
ia_pdf_url = None
for f in release.files or []:
- if f.extra and f.extra.get('shadows'):
- t['in_shadows'] = True
- is_pdf = 'pdf' in (f.mimetype or '')
- for release_url in (f.urls or []):
+ if f.extra and f.extra.get("shadows"):
+ t["in_shadows"] = True
+ is_pdf = "pdf" in (f.mimetype or "")
+ for release_url in f.urls or []:
# first generic flags
t.update(_rte_url_helper(release_url))
# then PDF specific stuff (for generating "best URL" fields)
- if not f.mimetype and 'pdf' in release_url.url.lower():
+ if not f.mimetype and "pdf" in release_url.url.lower():
is_pdf = True
if is_pdf:
any_pdf_url = release_url.url
- if release_url.rel in ('webarchive', 'repository', 'repo'):
+ if release_url.rel in ("webarchive", "repository", "repo"):
good_pdf_url = release_url.url
- if '//web.archive.org/' in release_url.url or '//archive.org/' in release_url.url:
+ if (
+ "//web.archive.org/" in release_url.url
+ or "//archive.org/" in release_url.url
+ ):
best_pdf_url = release_url.url
ia_pdf_url = release_url.url
# here is where we bake-in PDF url priority; IA-specific
- t['best_pdf_url'] = best_pdf_url or good_pdf_url or any_pdf_url
- t['ia_pdf_url'] = ia_pdf_url
+ t["best_pdf_url"] = best_pdf_url or good_pdf_url or any_pdf_url
+ t["ia_pdf_url"] = ia_pdf_url
for fs in release.filesets or []:
- for url_obj in (fs.urls or []):
+ for url_obj in fs.urls or []:
t.update(_rte_url_helper(url_obj))
for wc in release.webcaptures or []:
- for url_obj in (wc.archive_urls or []):
+ for url_obj in wc.archive_urls or []:
t.update(_rte_url_helper(url_obj))
return t
+
def _rte_url_helper(url_obj) -> dict:
"""
Takes a location URL ('url' and 'rel' keys) and returns generic preservation status.
@@ -382,17 +413,17 @@ def _rte_url_helper(url_obj) -> dict:
these will be iteratively update() into the overal object.
"""
t = dict()
- if url_obj.rel in ('webarchive', 'repository', 'archive', 'repo'):
- t['is_preserved'] = True
- if '//web.archive.org/' in url_obj.url or '//archive.org/' in url_obj.url:
- t['in_ia'] = True
- if url_obj.url.lower().startswith('http') or url_obj.url.lower().startswith('ftp'):
- t['in_web'] = True
- if url_obj.rel in ('dweb', 'p2p', 'ipfs', 'dat', 'torrent'):
+ if url_obj.rel in ("webarchive", "repository", "archive", "repo"):
+ t["is_preserved"] = True
+ if "//web.archive.org/" in url_obj.url or "//archive.org/" in url_obj.url:
+ t["in_ia"] = True
+ if url_obj.url.lower().startswith("http") or url_obj.url.lower().startswith("ftp"):
+ t["in_web"] = True
+ if url_obj.rel in ("dweb", "p2p", "ipfs", "dat", "torrent"):
# not sure what rel will be for this stuff
- t['in_dweb'] = True
- if '//www.jstor.org/' in url_obj.url:
- t['in_jstor'] = True
+ t["in_dweb"] = True
+ if "//www.jstor.org/" in url_obj.url:
+ t["in_jstor"] = True
return t
@@ -404,50 +435,59 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None):
Raises exception on error (never returns None)
"""
- if entity.state in ('redirect', 'deleted'):
+ if entity.state in ("redirect", "deleted"):
return dict(
- ident = entity.ident,
- state = entity.state,
+ ident=entity.ident,
+ state=entity.state,
)
- elif entity.state != 'active':
+ elif entity.state != "active":
raise ValueError("Unhandled entity state: {}".format(entity.state))
# First, the easy ones (direct copy)
t = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
- ident = entity.ident,
- state = entity.state,
- revision = entity.revision,
-
- name = entity.name,
- publisher = entity.publisher,
- container_type = entity.container_type,
- publication_status= entity.publication_status,
- issnl = entity.issnl,
- issne = entity.issne,
- issnp = entity.issnp,
- wikidata_qid = entity.wikidata_qid,
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
+ ident=entity.ident,
+ state=entity.state,
+ revision=entity.revision,
+ name=entity.name,
+ publisher=entity.publisher,
+ container_type=entity.container_type,
+ publication_status=entity.publication_status,
+ issnl=entity.issnl,
+ issne=entity.issne,
+ issnp=entity.issnp,
+ wikidata_qid=entity.wikidata_qid,
)
if not entity.extra:
entity.extra = dict()
- for key in ('country', 'languages', 'mimetypes', 'original_name',
- 'first_year', 'last_year', 'aliases', 'abbrev', 'region',
- 'discipline', 'publisher_type'):
+ for key in (
+ "country",
+ "languages",
+ "mimetypes",
+ "original_name",
+ "first_year",
+ "last_year",
+ "aliases",
+ "abbrev",
+ "region",
+ "discipline",
+ "publisher_type",
+ ):
if entity.extra.get(key):
t[key] = entity.extra[key]
- if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'):
- t['dblp_prefix'] = entity.extra['dblp']['prefix']
+ if entity.extra.get("dblp") and entity.extra["dblp"].get("prefix"):
+ t["dblp_prefix"] = entity.extra["dblp"]["prefix"]
- if 'country' in t:
- t['country_code'] = t.pop('country')
+ if "country" in t:
+ t["country_code"] = t.pop("country")
- t['issns'] = [entity.issnl, entity.issne, entity.issnp]
- for key in ('issnp', 'issne'):
+ t["issns"] = [entity.issnl, entity.issne, entity.issnp]
+ for key in ("issnp", "issne"):
if entity.extra.get(key):
- t['issns'].append(entity.extra[key])
- t['issns'] = list(set([i for i in t['issns'] if i]))
+ t["issns"].append(entity.extra[key])
+ t["issns"] = list(set([i for i in t["issns"] if i]))
in_doaj = None
in_road = None
@@ -459,72 +499,72 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None):
keepers = []
extra = entity.extra
- if extra.get('doaj'):
- if extra['doaj'].get('as_of'):
+ if extra.get("doaj"):
+ if extra["doaj"].get("as_of"):
in_doaj = True
- if extra.get('road'):
- if extra['road'].get('as_of'):
+ if extra.get("road"):
+ if extra["road"].get("as_of"):
in_road = True
- if extra.get('szczepanski'):
- if extra['szczepanski'].get('as_of'):
+ if extra.get("szczepanski"):
+ if extra["szczepanski"].get("as_of"):
is_oa = True
- if extra.get('default_license'):
- if extra['default_license'].startswith('CC-'):
+ if extra.get("default_license"):
+ if extra["default_license"].startswith("CC-"):
is_oa = True
- t['sherpa_romeo_color'] = None
- if extra.get('sherpa_romeo'):
- t['sherpa_romeo_color'] = extra['sherpa_romeo'].get('color')
- if extra['sherpa_romeo'].get('color') == 'white':
+ t["sherpa_romeo_color"] = None
+ if extra.get("sherpa_romeo"):
+ t["sherpa_romeo_color"] = extra["sherpa_romeo"].get("color")
+ if extra["sherpa_romeo"].get("color") == "white":
is_oa = False
- if extra.get('kbart'):
+ if extra.get("kbart"):
any_kbart = True
- if extra['kbart'].get('jstor'):
+ if extra["kbart"].get("jstor"):
any_jstor = True
- for k, v in extra['kbart'].items():
+ for k, v in extra["kbart"].items():
if v and isinstance(v, dict):
keepers.append(k)
- if extra.get('ia'):
- if extra['ia'].get('sim'):
+ if extra.get("ia"):
+ if extra["ia"].get("sim"):
any_ia_sim = True
- if extra['ia'].get('longtail_oa'):
+ if extra["ia"].get("longtail_oa"):
is_longtail_oa = True
- t['is_superceded'] = bool(extra.get('superceded'))
+ t["is_superceded"] = bool(extra.get("superceded"))
- t['keepers'] = keepers
- t['in_doaj'] = bool(in_doaj)
- t['in_road'] = bool(in_road)
- t['any_kbart'] = bool(any_kbart)
+ t["keepers"] = keepers
+ t["in_doaj"] = bool(in_doaj)
+ t["in_road"] = bool(in_road)
+ t["any_kbart"] = bool(any_kbart)
if force_bool:
- t['is_oa'] = bool(in_doaj or in_road or is_oa)
- t['is_longtail_oa'] = bool(is_longtail_oa)
- t['any_jstor'] = bool(any_jstor)
- t['any_ia_sim'] = bool(any_ia_sim)
+ t["is_oa"] = bool(in_doaj or in_road or is_oa)
+ t["is_longtail_oa"] = bool(is_longtail_oa)
+ t["any_jstor"] = bool(any_jstor)
+ t["any_ia_sim"] = bool(any_ia_sim)
else:
- t['is_oa'] = in_doaj or in_road or is_oa
- t['is_longtail_oa'] = is_longtail_oa
- t['any_jstor'] = any_jstor
- t['any_ia_sim'] = any_ia_sim
+ t["is_oa"] = in_doaj or in_road or is_oa
+ t["is_longtail_oa"] = is_longtail_oa
+ t["any_jstor"] = any_jstor
+ t["any_ia_sim"] = any_ia_sim
# mix in stats, if provided
if stats:
- t['releases_total'] = stats['total']
- t['preservation_bright'] = stats['preservation']['bright']
- t['preservation_dark'] = stats['preservation']['dark']
- t['preservation_shadows_only'] = stats['preservation']['shadows_only']
- t['preservation_none'] = stats['preservation']['none']
+ t["releases_total"] = stats["total"]
+ t["preservation_bright"] = stats["preservation"]["bright"]
+ t["preservation_dark"] = stats["preservation"]["dark"]
+ t["preservation_shadows_only"] = stats["preservation"]["shadows_only"]
+ t["preservation_none"] = stats["preservation"]["none"]
return t
def _type_of_edit(edit: EntityEdit) -> str:
if edit.revision is None and edit.redirect_ident is None:
- return 'delete'
+ return "delete"
elif edit.redirect_ident:
# redirect
- return 'update'
+ return "update"
elif edit.prev_revision is None and edit.redirect_ident is None and edit.revision:
- return 'create'
+ return "create"
else:
- return 'update'
+ return "update"
def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
@@ -536,7 +576,7 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
editgroup = entity.editgroup
t = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
index=entity.index,
editgroup_id=entity.editgroup_id,
timestamp=entity.timestamp.isoformat(),
@@ -547,8 +587,8 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
)
extra = editgroup.extra or dict()
- if extra.get('agent'):
- t['agent'] = extra['agent']
+ if extra.get("agent"):
+ t["agent"] = extra["agent"]
containers = [_type_of_edit(e) for e in editgroup.edits.containers]
creators = [_type_of_edit(e) for e in editgroup.edits.creators]
@@ -558,27 +598,27 @@ def changelog_to_elasticsearch(entity: ChangelogEntry) -> Dict[str, Any]:
releases = [_type_of_edit(e) for e in editgroup.edits.releases]
works = [_type_of_edit(e) for e in editgroup.edits.works]
- t['containers'] = len(containers)
- t['new_containers'] = len([e for e in containers if e == 'create'])
- t['creators'] = len(creators)
- t['new_creators'] = len([e for e in creators if e == 'create'])
- t['files'] = len(files)
- t['new_files'] = len([e for e in files if e == 'create'])
- t['filesets'] = len(filesets)
- t['new_filesets'] = len([e for e in filesets if e == 'create'])
- t['webcaptures'] = len(webcaptures)
- t['new_webcaptures'] = len([e for e in webcaptures if e == 'create'])
- t['releases'] = len(releases)
- t['new_releases'] = len([e for e in releases if e == 'create'])
- t['works'] = len(works)
- t['new_works'] = len([e for e in works if e == 'create'])
+ t["containers"] = len(containers)
+ t["new_containers"] = len([e for e in containers if e == "create"])
+ t["creators"] = len(creators)
+ t["new_creators"] = len([e for e in creators if e == "create"])
+ t["files"] = len(files)
+ t["new_files"] = len([e for e in files if e == "create"])
+ t["filesets"] = len(filesets)
+ t["new_filesets"] = len([e for e in filesets if e == "create"])
+ t["webcaptures"] = len(webcaptures)
+ t["new_webcaptures"] = len([e for e in webcaptures if e == "create"])
+ t["releases"] = len(releases)
+ t["new_releases"] = len([e for e in releases if e == "create"])
+ t["works"] = len(works)
+ t["new_works"] = len([e for e in works if e == "create"])
all_edits = containers + creators + files + filesets + webcaptures + releases + works
- t['created'] = len([e for e in all_edits if e == 'create'])
- t['updated'] = len([e for e in all_edits if e == 'update'])
- t['deleted'] = len([e for e in all_edits if e == 'delete'])
- t['total'] = len(all_edits)
+ t["created"] = len([e for e in all_edits if e == "create"])
+ t["updated"] = len([e for e in all_edits if e == "update"])
+ t["deleted"] = len([e for e in all_edits if e == "delete"])
+ t["total"] = len(all_edits)
return t
@@ -590,47 +630,47 @@ def file_to_elasticsearch(entity: FileEntity) -> Dict[str, Any]:
Raises exception on error (never returns None)
"""
- if entity.state in ('redirect', 'deleted'):
+ if entity.state in ("redirect", "deleted"):
return dict(
- ident = entity.ident,
- state = entity.state,
+ ident=entity.ident,
+ state=entity.state,
)
- elif entity.state != 'active':
+ elif entity.state != "active":
raise ValueError("Unhandled entity state: {}".format(entity.state))
# First, the easy ones (direct copy)
t = dict(
- doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
- ident = entity.ident,
- state = entity.state,
- revision = entity.revision,
- release_ids = entity.release_ids,
- release_count = len(entity.release_ids),
- mimetype = entity.mimetype,
- size_bytes = entity.size,
- sha1 = entity.sha1,
- sha256 = entity.sha256,
- md5 = entity.md5,
+ doc_index_ts=datetime.datetime.utcnow().isoformat() + "Z",
+ ident=entity.ident,
+ state=entity.state,
+ revision=entity.revision,
+ release_ids=entity.release_ids,
+ release_count=len(entity.release_ids),
+ mimetype=entity.mimetype,
+ size_bytes=entity.size,
+ sha1=entity.sha1,
+ sha256=entity.sha256,
+ md5=entity.md5,
)
parsed_urls = [tldextract.extract(u.url) for u in entity.urls]
- t['hosts'] = list(set(['.'.join([seg for seg in pu if seg]) for pu in parsed_urls]))
- t['domains'] = list(set([pu.registered_domain for pu in parsed_urls]))
- t['rels'] = list(set([u.rel for u in entity.urls]))
+ t["hosts"] = list(set([".".join([seg for seg in pu if seg]) for pu in parsed_urls]))
+ t["domains"] = list(set([pu.registered_domain for pu in parsed_urls]))
+ t["rels"] = list(set([u.rel for u in entity.urls]))
- t['in_ia'] = bool('archive.org' in t['domains'])
- t['in_ia_petabox'] = bool('archive.org' in t['hosts'])
+ t["in_ia"] = bool("archive.org" in t["domains"])
+ t["in_ia_petabox"] = bool("archive.org" in t["hosts"])
any_url = None
good_url = None
best_url = None
- for release_url in (entity.urls or []):
+ for release_url in entity.urls or []:
any_url = release_url.url
- if release_url.rel in ('webarchive', 'repository'):
+ if release_url.rel in ("webarchive", "repository"):
good_url = release_url.url
- if '//web.archive.org/' in release_url.url or '//archive.org/' in release_url.url:
+ if "//web.archive.org/" in release_url.url or "//archive.org/" in release_url.url:
best_url = release_url.url
# here is where we bake-in priority; IA-specific
- t['best_url'] = best_url or good_url or any_url
+ t["best_url"] = best_url or good_url or any_url
return t