import logging import os import re import sys from typing import Any, List, Optional, Type, Union import elasticsearch import elasticsearch_dsl import fatcat_openapi_client import requests from fatcat_openapi_client import (ContainerEntity, DefaultApi, ReleaseContrib, ReleaseEntity) from fatcat_openapi_client.rest import ApiException from fuzzycat.config import settings from fuzzycat.contrib import (ContribListMatcher, FuzzyStringSimilarity, JaccardIndexThreshold, Pipeline) from fuzzycat.entities import entity_from_dict, entity_from_json from fuzzycat.utils import es_compat_hits_total FATCAT_API_URL = settings.get("FATCAT_API_URL", "https://api.fatcat.wiki/v0") class FuzzyReleaseMatcher: """ FuzzyReleaseMatcher tries to find similar items to a given release in elasticsearch. Exact matches first, then fuzzy. In the best case, elasticsearch would automatically rank the most relevant docs first, even with partial data. We still try to steer the matches by using a query cascade. This is configurable. The last query should be a generic. The goal here is to get a set of potential matches; verification has to. happen separately. TODO: Example case not yet working well ("Stuehrenberg" vs "Stührenberg"): >>> result = matcher.match(entity_from_dict({"title": "internet archive", "contribs": [{"raw_name": "Stührenberg"}], "ext_ids": {}}, ReleaseEntity)) > Should return: https://fatcat.wiki/release/pu7e7tbctna2foqyyxztfw3ufy, https://fatcat.wiki/release/search?q=St%C3%BChrenberg+internet+archive&generic=1 (not returning anything via frontend either) Make sure we can switch from function to class: * [ ] 5 test cases for both """ def __init__(self, es="https://search.fatcat.wiki", api=None, index="fatcat_release", size=10): if isinstance(es, str): self.es = elasticsearch.Elasticsearch([es]) else: self.es = es if es else elasticsearch.Elasticsearch() self.api = api if api else public_api(FATCAT_API_URL) self.index = index self.size = size self.logger = logging.getLogger("fuzzy") def match_release_by_id(self, release, **kwargs) -> List[ReleaseEntity]: """ Check for exact matches by identifier. """ ext_ids = release.ext_ids attrs = ( "doi", "pmid", "wikidata_qid", "core", "pmcid", "arxiv", "dblp", "doaj", "jstor", "isbn13", "ark", "mag", "oai", ) for attr in attrs: value = getattr(ext_ids, attr) if not value: continue try: r = self.api.lookup_release(**{attr: value}) except fatcat_openapi_client.rest.ApiException as err: if err.status in [404, 400]: r = None else: raise err if r: return [r] return [] def match_release_exact_title_exact_contrib(self, release): """ Match exact title and exact contrib names. Case insensitive, order of contribs does not matter. """ if release.title is None or release.contribs is None: return [] contrib_queries = [{ "match": { "contrib_names": { "query": contrib.raw_name, "operator": "AND", } } } for contrib in release.contribs] query = { "bool": { "must": [{ "match": { "title": { "query": release.title, "operator": "AND", }, } }] + contrib_queries, }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) hits_total = es_compat_hits_total(resp) if hits_total == 0: return result if hits_total > self.size: self.logger.warn('more than {} hits: {}'.format(self.size, hits_total)) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) # Require overlap of contrib. matcher = ContribListMatcher( cmp=JaccardIndexThreshold(1.0), pipeline=Pipeline([ lambda contribs: set((c.raw_name.strip().lower() for c in contribs)), ]), ) for re in entities: if re.title.strip().lower() != release.title.strip().lower(): continue if not matcher.compare(re.contribs, release.contribs): continue result.append(re) return result def match_release_exact_title_partial_contrib(self, release): """ Allow for exact authors, but ok, if some are missing. """ if release.title is None or release.contribs is None: return [] contrib_queries = [{ "match": { "contrib_names": { "query": contrib.raw_name, "operator": "AND", } } } for contrib in release.contribs] query = { "bool": { "must": [{ "match": { "title": { "query": release.title, "operator": "AND", }, } }] + contrib_queries, }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: raise NotImplementedError('result set too large: {}'.format(es)) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) # Require at least half the contribs to be shared. matcher = ContribListMatcher( cmp=JaccardIndexThreshold(0.5), pipeline=Pipeline([ lambda contribs: set((c.raw_name.strip().lower() for c in contribs)), ]), ) for re in entities: if re.title.strip().lower() != release.title.strip().lower(): continue if not matcher.compare(re.contribs, release.contribs): continue result.append(re) return result def match_release_exact_title_fuzzy_contrib(self, release): """ Exact title but ok it authors differ (slightly). """ if release.title is None or release.contribs is None: return [] contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()] contrib_queries = [{ "match": { "contrib_names": { "query": token, } } } for token in contrib_tokens] query = { "bool": { "must": [{ "match": { "title": { "query": release.title, "operator": "AND", }, } }] + contrib_queries, }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: raise NotImplementedError('todo: scroll required for larger result sets: {}'.format(es)) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) matcher = ContribListMatcher( cmp=FuzzyStringSimilarity(min_ratio=60), pipeline=Pipeline([ lambda contribs: set((c.raw_name.strip().lower() for c in contribs)), ]), ) for re in entities: if re.title.strip().lower() != release.title.strip().lower(): continue if not matcher.compare(re.contribs, release.contribs): continue result.append(re) return result def match_release_exact_title(self, release): """ Exact title, but any author. For common titles, this will yield 100s or 1000s or results. """ if release.title is None: return [] query = { "bool": { "must": [{ "match": { "title": { "query": release.title, "operator": "AND", }, } }], }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) for re in entities: if re.title.strip().lower() != release.title.strip().lower(): continue result.append(re) return result def match_release_fuzzy_title_fuzzy_contrib(self, release): """ Using elasticsearch fuzziness option (which is not that fuzzy). """ if release.title is None or release.contribs is None: return [] contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()] contrib_queries = [{ "match": { "contrib_names": { "query": token, } } } for token in contrib_tokens] query = { "bool": { "must": [ { "match": { "title": { "query": release.title, "operator": "AND", "fuzziness": "AUTO", }, } }, ] + contrib_queries, }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: raise ValueError('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) return entities def match_release_generic(self, release): """ Final catch all variant via title. """ if release.title is None: return [] query = { "bool": { "must": [ { "match": { "title": { "query": release.title, "operator": "OR", "fuzziness": "AUTO", }, } }, ], }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) return entities def match_release_generic_fuzzy_contrib(self, release): """ Only match contribs, if they exist. """ if release.contribs is None: return [] contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()] contrib_queries = [{ "match": { "contrib_names": { "query": token, } } } for token in contrib_tokens] query = { "bool": { "must": contrib_queries, }, } result = [] resp = self.es.search(query=query, size=self.size, track_total_hits=True, index=self.index) if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) return entities def match_cascade(self, release, *qs, **kwargs): """ Returns the result from the first query that returns a result. All query functions need to be defined on this class (for now). """ for q in qs: self.logger.debug("[cascade] {}".format(q)) result = q(release, **kwargs) if len(result) > 0: return result return [] def match(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ Match returns a list of match candidates given a release entity. """ if not release: return [] return self.match_cascade( release, self.match_release_by_id, self.match_release_exact_title_exact_contrib, self.match_release_exact_title_partial_contrib, self.match_release_exact_title_fuzzy_contrib, self.match_release_exact_title, self.match_release_fuzzy_title_fuzzy_contrib, self.match_release_generic, self.match_release_generic_fuzzy_contrib) def match_release_fuzzy( release: ReleaseEntity, size: int = 5, es: Optional[Union[str, Type[elasticsearch.client.Elasticsearch]]] = None, api: DefaultApi = None, ) -> List[ReleaseEntity]: """ Given a release entity, return a number similar release entities from fatcat using Elasticsearch. TODO: rename "es" parameter to "es_client", which would be clearer This is deprecated, move to matcher class. """ assert isinstance(release, ReleaseEntity) if size is None or size == 0: size = 10000 # or any large number if isinstance(es, str): es = elasticsearch.Elasticsearch([es]) if es is None: es = elasticsearch.Elasticsearch() if api is None: api = public_api(FATCAT_API_URL) # > query cascade # # [x] 1 exact ids # [ ] 2 exact title and exact contrib # [ ] 3 exact title and fuzzy contrib # [ ] 4 exact title # [ ] 5 title w/o stopwords, fuzzy contrib # [ ] 6 title w/o stopwords # [ ] 7 fuzzy title and fuzzy contrib # [ ] 8 fuzzy whole document # Try to match by external identifier. # TODO: use api, ability to disable; benchmark ext_ids = release.ext_ids attrs = ( "doi", "wikidata_qid", "isbn13", "pmid", "pmcid", "core", "arxiv", "jstor", "ark", "mag", "doaj", "dblp", "oai", ) for attr in attrs: value = getattr(ext_ids, attr) if not value: continue try: r = api.lookup_release(**{attr: value}) except fatcat_openapi_client.rest.ApiException as err: if err.status in [404, 400]: r = None else: raise err if r: return [r] if release.title is not None and release.contribs is not None: names = " ".join([c.raw_name for c in release.contribs]) query = { "bool": { "must": [ { "match": { "title": { "query": release.title, "operator": "AND", "fuzziness": "AUTO", }, } }, { "match": { "contrib_names": { "query": names, "operator": "AND", "fuzziness": "AUTO", } } }, ], }, } resp = es.search(query=query, index="fatcat_release", size=size, track_total_hits=True) if es_compat_hits_total(resp) > 0: return response_to_entity_list(resp, entity_type=ReleaseEntity, size=size, api=api) query = { "bool": { "should": [ { "match": { "title": { "query": release.title, "operator": "AND", "fuzziness": "AUTO", }, } }, { "match": { "contrib_names": { "query": names, "operator": "AND", "fuzziness": "AUTO", } } }, ], }, } resp = es.search(query=query, index="fatcat_release", size=size, track_total_hits=True) if es_compat_hits_total(resp) > 0: return response_to_entity_list(resp, entity_type=ReleaseEntity, size=size, api=api) # Note: If the title is short, we will get lots of results here; do we need # to check for title length or result set length length or result set # length here? query = { "match": { "title": { "query": release.title, "operator": "AND", } } } resp = es.search(query=query, index="fatcat_release", size=size, track_total_hits=True) if es_compat_hits_total(resp) > 0: return response_to_entity_list(resp, entity_type=ReleaseEntity, size=size, api=api) # Get fuzzy. # https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#fuzziness query = { "match": { "title": { "query": release.title, "operator": "AND", "fuzziness": "AUTO", } } } resp = es.search(query=query, index="fatcat_release", size=size, track_total_hits=True) if es_compat_hits_total(resp) > 0: return response_to_entity_list(resp, entity_type=ReleaseEntity, size=size, api=api) # TODO: perform more queries on other fields. return [] def public_api(host_uri): """ Note: unlike the authenticated variant, this helper might get called even if the API isn't going to be used, so it's important that it doesn't try to actually connect to the API host or something. """ conf = fatcat_openapi_client.Configuration() conf.host = host_uri return fatcat_openapi_client.DefaultApi(fatcat_openapi_client.ApiClient(conf)) def retrieve_entity_list( ids: List[str], api: DefaultApi = None, entity_type: Union[Type[ReleaseEntity], Type[ContainerEntity]] = ReleaseEntity, ) -> List[Union[Type[ReleaseEntity], Type[ContainerEntity]]]: """ Retrieve a list of entities. Some entities might be missing. Return all that are accessible. """ if api is None: api = public_api(FATCAT_API_URL) result = [] if entity_type == ReleaseEntity: for id in ids: try: re = api.get_release(id, hide="refs,abstracts", expand="container,contribs,files") result.append(re) except ApiException as exc: if exc.status == 404: print("[err] failed to retrieve release entity: {}".format(id), file=sys.stderr) else: print("[err] api failed with {}: {}".format(exc.status, exc.message), file=sys.stderr) elif entity_type == ContainerEntity: for id in ids: try: re = api.get_container(id) result.append(re) except ApiException as exc: if exc.status == 404: print("[err] failed to retrieve container entity: {}".format(id), file=sys.stderr) else: print("[err] api failed with {}: {}".format(exc.status, exc.message), file=sys.stderr) else: raise ValueError("[err] cannot retrieve ids {} of type {}".format(ids, entity_type)) return result def response_to_entity_list(response, size=5, entity_type=ReleaseEntity, api: DefaultApi = None): """ Convert an elasticsearch result to a list of entities. Accepts both a dictionary and an elasticsearch_dsl.response.Response. We take the ids from elasticsearch and retrieve entities via API. """ if isinstance(response, dict): ids = [hit["_source"]["ident"] for hit in response["hits"]["hits"]][:size] return retrieve_entity_list(ids, entity_type=entity_type, api=api) elif isinstance(response, elasticsearch_dsl.response.Response): ids = [hit.to_dict().get("ident") for hit in response] return retrieve_entity_list(ids, entity_type=entity_type, api=api) else: raise ValueError("cannot convert {}".format(response)) def anything_to_entity( s: str, entity_type: Union[Type[ContainerEntity], Type[ReleaseEntity]], api_url: str = "https://api.fatcat.wiki/v0", es_url: str = "https://search.fatcat.wiki", ) -> Union[ContainerEntity, ReleaseEntity]: """ Convert a string to a given entity type. This function may go out to the fatcat API or elasticsearch and hence is expensive. """ names = { ContainerEntity: "container", ReleaseEntity: "release", } if not entity_type in names: raise ValueError("cannot convert {}, only: {}".format(entity_type, names.keys())) entity_name = names[entity_type] if s is None: raise ValueError("no entity found") if os.path.exists(s): with open(s) as f: return entity_from_json(f.read(), entity_type) match = re.search("/?([a-z0-9]{26})$", s) if match: url = "{}/{}/{}".format(api_url, entity_name, match.group(1)) resp = requests.get(url) if resp.status_code == 200: return entity_from_json(resp.text, entity_type) if resp.status_code == 404: raise ValueError("entity not found: {}".format(url)) if re.match("[0-9]{4}(-)?[0-9]{3,3}[0-9xx]", s): # TODO: make index name configurable url = "{}/fatcat_{}/_search?track_total_hits=true&q=issns:{}".format(es_url, entity_name, s) doc = requests.get(url).json() if es_compat_hits_total(resp) == 1: ident = doc["hits"]["hits"][0]["_source"]["ident"] url = "{}/{}/{}".format(api_url, entity_name, ident) return entity_from_json(requests.get(url).text, entity_type) if entity_name == "container": return entity_from_dict({"name": s}, entity_type) elif entity_name == "release": return entity_from_dict({"title": s, "ext_ids": {}}, entity_type) else: raise ValueError("unhandled entity type: {}".format(entity_type))