from typing import Any, Dict, List, Optional import fatcat_openapi_client from fatcat_openapi_client import ApiClient, FileEntity from fatcat_tools.normal import clean_doi from .common import SANE_MAX_RELEASES, SANE_MAX_URLS, EntityImporter, make_rel_url class MatchedImporter(EntityImporter): """ Importer for "file to crossref DOI" matches. These matches are currently generated by Internet Archive hadoop jobs written in scala (part of the 'sandcrawler' repo/project), but could be generated by other parties as well. Input format is JSON with keys: - dois (list) - sha1 (hex) - md5 (hex) - sha256 (hex) - size (int) - cdx (list of objects) - dt (optional; if included creates wayback link) - url - mimetype - urls (list of strings... or objects?) Future handlings/extensions: - core_id, wikidata_id, pmcid, pmid: not as lists """ def __init__(self, api: ApiClient, **kwargs) -> None: eg_desc = ( kwargs.pop("editgroup_description", None) or "Import of large-scale file-to-release match results. Source of metadata varies." ) eg_extra = kwargs.pop("editgroup_extra", dict()) eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.MatchedImporter") super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) self.default_link_rel = kwargs.get("default_link_rel", "web") self.default_mimetype = kwargs.get("default_mimetype", None) def want(self, raw_record: Any) -> bool: return True def parse_record(self, obj: Dict[str, Any]) -> Optional[FileEntity]: dois = [d.lower() for d in obj.get("dois", [])] # lookup dois re_list = set() for doi in dois: doi = clean_doi(doi) if not doi: self.counts["skip-bad-doi"] += 1 return None try: re = self.api.lookup_release(doi=doi) except fatcat_openapi_client.rest.ApiException as err: if err.status != 404: raise err re = None if re is None: # print("DOI not found: {}".format(doi)) pass else: re_list.add(re.ident) # look up other external ids for extid_type in ( "arxiv", "pmid", "pmcid", "jstor", "wikidata_qid", "core", "isbn13", "ark", ): extid = obj.get(extid_type) if extid: try: re = self.api.lookup_release(**{extid_type: extid}) except fatcat_openapi_client.rest.ApiException as err: if err.status != 404: raise err re = None if re is None: pass else: re_list.add(re.ident) release_ids = list(re_list) if len(release_ids) == 0: self.counts["skip-no-releases"] += 1 return None if len(release_ids) > SANE_MAX_RELEASES: self.counts["skip-too-many-releases"] += 1 return None # parse URLs and CDX urls_set = set() for url in obj.get("urls", []): url = make_rel_url(url, default_link_rel=self.default_link_rel) if url is not None: urls_set.add(url) for cdx in obj.get("cdx", []): original = cdx["url"] if cdx.get("dt"): wayback = "https://web.archive.org/web/{}/{}".format(cdx["dt"], original) urls_set.add(("webarchive", wayback)) url = make_rel_url(original, default_link_rel=self.default_link_rel) if url is not None: urls_set.add(url) urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls_set] if len(urls) == 0: self.counts["skip-no-urls"] += 1 return None if len(urls) > SANE_MAX_URLS: self.counts["skip-too-many-urls"] += 1 return None size = obj.get("size") if size: size = int(size) mimetype = obj.get("mimetype", self.default_mimetype) if not mimetype and urls: if urls[0].url.endswith(".pdf"): mimetype = "application/pdf" fe = FileEntity( md5=obj.get("md5"), sha1=obj["sha1"], sha256=obj.get("sha256"), size=size, mimetype=mimetype, release_ids=release_ids, urls=urls, ) return fe def try_update(self, fe: FileEntity) -> bool: # lookup sha1, or create new entity existing = None try: existing = self.api.lookup_file(sha1=fe.sha1) except fatcat_openapi_client.rest.ApiException as err: if err.status != 404: raise err if not existing: return True combined_release_ids = list(set(fe.release_ids + existing.release_ids)) if set(combined_release_ids) == set(existing.release_ids) and len(existing.urls) > 0: # no new release matches *and* there are already existing URLs self.counts["exists"] += 1 return False # check for edit conflicts if existing.ident in [e.ident for e in self._edits_inflight]: self.counts["skip-update-inflight"] += 1 return False # minimum viable "existing" URL cleanup to fix dupes and broken links: # remove 'None' wayback URLs, and set archive.org rel 'archive' existing.urls = [ u for u in existing.urls if not ("://web.archive.org/web/None/" in u.url) ] for i in range(len(existing.urls)): u = existing.urls[i] if u.rel == "repository" and "://archive.org/download/" in u.url: existing.urls[i].rel = "archive" # special case: if importing *new* from archive.org arxiv collections, # blow away any existing release_id mappings; this is a direct arxiv_id # map. This *should* be safe to run in all matched imports. is_arxiv = False for u in fe.urls: if "archive.org/download/arxiv" in u.url.lower(): is_arxiv = True break if is_arxiv and fe.release_ids: existing.release_ids = fe.release_ids # merge the existing into this one and update existing.urls = list(set([(u.rel, u.url) for u in fe.urls + existing.urls])) existing.urls = [ fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in existing.urls ] if len(existing.urls) > SANE_MAX_URLS: self.counts["skip-update-too-many-url"] += 1 return False existing.release_ids = list(set(fe.release_ids + existing.release_ids)) if len(existing.release_ids) > SANE_MAX_RELEASES: self.counts["skip-update-too-many-releases"] += 1 return False existing.mimetype = existing.mimetype or fe.mimetype existing.size = existing.size or fe.size existing.md5 = existing.md5 or fe.md5 existing.sha1 = existing.sha1 or fe.sha1 existing.sha256 = existing.sha256 or fe.sha256 edit = self.api.update_file(self.get_editgroup_id(), existing.ident, existing) self._edits_inflight.append(edit) self.counts["update"] += 1 return False def insert_batch(self, batch: List[FileEntity]) -> None: self.api.create_file_auto_batch( fatcat_openapi_client.FileAutoBatch( editgroup=fatcat_openapi_client.Editgroup( description=self.editgroup_description, extra=self.editgroup_extra ), entity_list=batch, ) )