import sys import json import sqlite3 import itertools import fatcat_openapi_client from fatcat_tools.normal import * from .common import EntityImporter, clean, make_rel_url, SANE_MAX_RELEASES, SANE_MAX_URLS class MatchedImporter(EntityImporter): """ Importer for "file to crossref DOI" matches. These matches are currently generated by Internet Archive hadoop jobs written in scala (part of the 'sandcrawler' repo/project), but could be generated by other parties as well. Input format is JSON with keys: - dois (list) - sha1 (hex) - md5 (hex) - sha256 (hex) - size (int) - cdx (list of objects) - dt (optional; if included creates wayback link) - url - mimetype - urls (list of strings... or objects?) Future handlings/extensions: - core_id, wikidata_id, pmcid, pmid: not as lists """ def __init__(self, api, **kwargs): eg_desc = kwargs.pop('editgroup_description', None) or "Import of large-scale file-to-release match results. Source of metadata varies." eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.MatchedImporter') super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) self.default_link_rel = kwargs.get("default_link_rel", "web") self.default_mimetype = kwargs.get("default_mimetype", None) def want(self, raw_record): return True def parse_record(self, obj): dois = [d.lower() for d in obj.get('dois', [])] # lookup dois re_list = set() for doi in dois: doi = clean_doi(doi) if not doi: self.counts['skip-bad-doi'] += 1 return None try: re = self.api.lookup_release(doi=doi) except fatcat_openapi_client.rest.ApiException as err: if err.status != 404: raise err re = None if re is None: #print("DOI not found: {}".format(doi)) pass else: re_list.add(re.ident) # look up other external ids for extid_type in ('arxiv', 'pmid', 'pmcid', 'jstor', 'wikidata_qid', 'core', 'isbn13', 'ark'): extid = obj.get(extid_type) if extid: try: re = self.api.lookup_release(**{extid_type: extid}) except fatcat_openapi_client.rest.ApiException as err: if err.status != 404: raise err re = None if re is None: pass else: re_list.add(re.ident) release_ids = list(re_list) if len(release_ids) == 0: self.counts['skip-no-releases'] += 1 return None if len(release_ids) > SANE_MAX_RELEASES: self.counts['skip-too-many-releases'] += 1 return None # parse URLs and CDX urls = set() for url in obj.get('urls', []): url = make_rel_url(url, default_link_rel=self.default_link_rel) if url != None: urls.add(url) for cdx in obj.get('cdx', []): original = cdx['url'] if cdx.get('dt'): wayback = "https://web.archive.org/web/{}/{}".format( cdx['dt'], original) urls.add(("webarchive", wayback)) url = make_rel_url(original, default_link_rel=self.default_link_rel) if url != None: urls.add(url) urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls] if len(urls) == 0: self.counts['skip-no-urls'] += 1 return None if len(urls) > SANE_MAX_URLS: self.counts['skip-too-many-urls'] += 1 return None size = obj.get('size') if size: size = int(size) mimetype = obj.get('mimetype', self.default_mimetype) if not mimetype and urls: if urls[0].url.endswith('.pdf'): mimetype = 'application/pdf' fe = fatcat_openapi_client.FileEntity( md5=obj.get('md5'), sha1=obj['sha1'], sha256=obj.get('sha256'), size=size, mimetype=mimetype, release_ids=release_ids, urls=urls, ) return fe def try_update(self, fe): # lookup sha1, or create new entity existing = None try: existing = self.api.lookup_file(sha1=fe.sha1) except fatcat_openapi_client.rest.ApiException as err: if err.status != 404: raise err if not existing: return True combined_release_ids = list(set(fe.release_ids + existing.release_ids)) if set(combined_release_ids) == set(existing.release_ids) and len(existing.urls) > 0: # no new release matches *and* there are already existing URLs self.counts['exists'] += 1 return False # check for edit conflicts if existing.ident in [e.ident for e in self._edits_inflight]: self.counts['skip-update-inflight'] += 1 return False # minimum viable "existing" URL cleanup to fix dupes and broken links: # remove 'None' wayback URLs, and set archive.org rel 'archive' existing.urls = [u for u in existing.urls if not ('://web.archive.org/web/None/' in u.url)] for i in range(len(existing.urls)): u = existing.urls[i] if u.rel == 'repository' and '://archive.org/download/' in u.url: existing.urls[i].rel = 'archive' # special case: if importing *new* from archive.org arxiv collections, # blow away any existing release_id mappings; this is a direct arxiv_id # map. This *should* be safe to run in all matched imports. is_arxiv = False for u in fe.urls: if 'archive.org/download/arxiv' in u.url.lower(): is_arxiv = True break if is_arxiv and fe.release_ids: existing.release_ids = fe.release_ids # merge the existing into this one and update existing.urls = list(set([(u.rel, u.url) for u in fe.urls + existing.urls])) existing.urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in existing.urls] if len(existing.urls) > SANE_MAX_URLS: self.counts['skip-update-too-many-url'] += 1 return None existing.release_ids = list(set(fe.release_ids + existing.release_ids)) if len(existing.release_ids) > SANE_MAX_RELEASES: self.counts['skip-update-too-many-releases'] += 1 return None existing.mimetype = existing.mimetype or fe.mimetype existing.size = existing.size or fe.size existing.md5 = existing.md5 or fe.md5 existing.sha1 = existing.sha1 or fe.sha1 existing.sha256 = existing.sha256 or fe.sha256 edit = self.api.update_file(self.get_editgroup_id(), existing.ident, existing) self._edits_inflight.append(edit) self.counts['update'] += 1 return False def insert_batch(self, batch): self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( editgroup=fatcat_openapi_client.Editgroup( description=self.editgroup_description, extra=self.editgroup_extra), entity_list=batch))