From dd6149140542585f2b0bfc3b334ec2b0a88b790e Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 17 Nov 2021 14:51:50 +0100 Subject: complete FuzzyReleaseMatcher refactoring We keep the name, since the api - "matcher.match(release)" - is the same; simplified queries; at most one query is performed against elasticsearch; parallel release retrieval from the API; optional support for release year windows; Test cases are expressed in yaml and will be auto-loaded from the specified directory; test work against the current search endpoint, which means the actual output may change on index updates; for the moment, we think this setup is relatively simple and not too unstable. about: title contrib, partial name input: > { "contribs": [ { "raw_name": "Adams" } ], "title": "digital libraries", "ext_ids": {} } release_year_padding: 1 expected: - 7rmvqtrb2jdyhcxxodihzzcugy - a2u6ougtsjcbvczou6sazsulcm - dy45vilej5diros6zmax46nm4e - exuwhhayird4fdjmmsiqpponlq - gqrj7jikezgcfpjfazhpf4e7c4 - mkmqt3453relbpuyktnmsg6hjq - t2g5sl3dgzchtnq7dynxyzje44 - t4tvenhrvzamraxrvvxivxmvga - wd3oeoi3bffknfbg2ymleqc4ja - y63a6dhrfnb7bltlxfynydbojy --- fuzzycat/matching.py | 479 +++++++++++++++++++++------------------------------ 1 file changed, 201 insertions(+), 278 deletions(-) (limited to 'fuzzycat/matching.py') diff --git a/fuzzycat/matching.py b/fuzzycat/matching.py index 2984d9a..cb6acbb 100644 --- a/fuzzycat/matching.py +++ b/fuzzycat/matching.py @@ -1,7 +1,9 @@ +import collections import logging import os import re import sys +from multiprocessing.dummy import Pool from typing import Any, List, Optional, Type, Union import elasticsearch @@ -22,37 +24,24 @@ FATCAT_API_URL = settings.get("FATCAT_API_URL", "https://api.fatcat.wiki/v0") class FuzzyReleaseMatcher: """ - FuzzyReleaseMatcher tries to find similar items to a given release in - elasticsearch. Exact matches first, then fuzzy. + This is a helper class to fetch related documents to a given release + document from fatcat search (currently elasticsearc)). Elasticsearch should + rank similar documents high itself, so all we try to do here is to tweak + the specific query a bit, depending on the completeness of the input + document, e.g. if the input has contrib and title, then use both, if it + only has a title, then use just that, etc. - In the best case, elasticsearch would automatically rank the most relevant - docs first, even with partial data. We still try to steer the matches by - using a query cascade. This is configurable. The last query should be a - generic. - - The goal here is to get a set of potential matches; verification has to. - happen separately. - - TODO: - - Example case not yet working well ("Stuehrenberg" vs "Stührenberg"): - - >>> result = matcher.match(entity_from_dict({"title": "internet archive", - "contribs": [{"raw_name": - "Stührenberg"}], - "ext_ids": {}}, - ReleaseEntity)) - - > Should return: https://fatcat.wiki/release/pu7e7tbctna2foqyyxztfw3ufy, - https://fatcat.wiki/release/search?q=St%C3%BChrenberg+internet+archive&generic=1 - (not returning anything via frontend either) - - Make sure we can switch from function to class: - - * [ ] 5 test cases for both + We try to get the result in a single query. + TODO/Tweaks: e.g. if document do have a "release_year", add this as a "should" clause. """ - def __init__(self, es="https://search.fatcat.wiki", api=None, index="fatcat_release", size=10): + def __init__(self, + es="https://search.fatcat.wiki", + api=None, + index="fatcat_release", + size=10, + min_token_length=3, + release_year_padding=1): if isinstance(es, str): self.es = elasticsearch.Elasticsearch([es]) else: @@ -61,8 +50,10 @@ class FuzzyReleaseMatcher: self.index = index self.size = size self.logger = logging.getLogger("fuzzy") + self.min_token_length = min_token_length + self.release_year_padding = 1 - def match_release_by_id(self, release, **kwargs) -> List[ReleaseEntity]: + def _match_id(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ Check for exact matches by identifier. """ @@ -97,229 +88,10 @@ class FuzzyReleaseMatcher: return [r] return [] - def match_release_exact_title_exact_contrib(self, release): - """ - Match exact title and exact contrib names. Case insensitive, order of - contribs does not matter. - """ - if release.title is None or release.contribs is None: - return [] - contrib_queries = [{ - "match": { - "contrib_names": { - "query": contrib.raw_name, - "operator": "AND", - } - } - } for contrib in release.contribs] - query = { - "bool": { - "must": [{ - "match": { - "title": { - "query": release.title, - "operator": "AND", - }, - } - }] + contrib_queries, - }, - } - result = [] - - resp = self.es.search(index=self.index, - body={ - "query": query, - "size": self.size, - "track_total_hits": True - }) - hits_total = es_compat_hits_total(resp) - if hits_total == 0: - return result - if hits_total > self.size: - self.logger.warn('more than {} hits: {}'.format(self.size, hits_total)) - - entities = response_to_entity_list(resp, - entity_type=ReleaseEntity, - size=self.size, - api=self.api) - - # Require overlap of contrib. - matcher = ContribListMatcher( - cmp=JaccardIndexThreshold(1.0), - pipeline=Pipeline([ - lambda contribs: set((c.raw_name.strip().lower() for c in contribs)), - ]), - ) - - for re in entities: - if re.title.strip().lower() != release.title.strip().lower(): - continue - if not matcher.compare(re.contribs, release.contribs): - continue - result.append(re) - return result - - def match_release_exact_title_partial_contrib(self, release): - """ - Allow for exact authors, but ok, if some are missing. - """ - if release.title is None or release.contribs is None: - return [] - contrib_queries = [{ - "match": { - "contrib_names": { - "query": contrib.raw_name, - "operator": "AND", - } - } - } for contrib in release.contribs] - query = { - "bool": { - "must": [{ - "match": { - "title": { - "query": release.title, - "operator": "AND", - }, - } - }] + contrib_queries, - }, - } - result = [] - resp = self.es.search(index=self.index, - body={ - "query": query, - "size": self.size, - "track_total_hits": True - }) - if es_compat_hits_total(resp) == 0: - return result - if es_compat_hits_total(resp) > self.size: - raise NotImplementedError('result set too large: {}'.format(es)) - entities = response_to_entity_list(resp, - entity_type=ReleaseEntity, - size=self.size, - api=self.api) - - # Require at least half the contribs to be shared. - matcher = ContribListMatcher( - cmp=JaccardIndexThreshold(0.5), - pipeline=Pipeline([ - lambda contribs: set((c.raw_name.strip().lower() for c in contribs)), - ]), - ) - - for re in entities: - if re.title.strip().lower() != release.title.strip().lower(): - continue - if not matcher.compare(re.contribs, release.contribs): - continue - result.append(re) - return result - - def match_release_exact_title_fuzzy_contrib(self, release): - """ - Exact title but ok it authors differ (slightly). - """ - if release.title is None or release.contribs is None: - return [] - contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()] - contrib_queries = [{ - "match": { - "contrib_names": { - "query": token, - } - } - } for token in contrib_tokens] - query = { - "bool": { - "must": [{ - "match": { - "title": { - "query": release.title, - "operator": "AND", - }, - } - }] + contrib_queries, - }, - } - result = [] - resp = self.es.search(index=self.index, - body={ - "query": query, - "size": self.size, - "track_total_hits": True - }) - if es_compat_hits_total(resp) == 0: - return result - if es_compat_hits_total(resp) > self.size: - raise NotImplementedError('todo: scroll required for larger result sets: {}'.format(es)) - entities = response_to_entity_list(resp, - entity_type=ReleaseEntity, - size=self.size, - api=self.api) - - matcher = ContribListMatcher( - cmp=FuzzyStringSimilarity(min_ratio=60), - pipeline=Pipeline([ - lambda contribs: set((c.raw_name.strip().lower() for c in contribs)), - ]), - ) - - for re in entities: - if re.title.strip().lower() != release.title.strip().lower(): - continue - if not matcher.compare(re.contribs, release.contribs): - continue - result.append(re) - return result - - def match_release_exact_title(self, release): + def _match_title_contrib(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ - Exact title, but any author. For common titles, this will yield 100s or - 1000s or results. + Match in the presence of defined title and contrib fields. """ - if release.title is None: - return [] - query = { - "bool": { - "must": [{ - "match": { - "title": { - "query": release.title, - "operator": "AND", - }, - } - }], - }, - } - result = [] - resp = self.es.search(body={ - "query": query, - "size": self.size, - "track_total_hits": True - }, - index=self.index) - if es_compat_hits_total(resp) == 0: - return result - if es_compat_hits_total(resp) > self.size: - self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp))) - entities = response_to_entity_list(resp, - entity_type=ReleaseEntity, - size=self.size, - api=self.api) - for re in entities: - if re.title.strip().lower() != release.title.strip().lower(): - continue - result.append(re) - return result - - def match_release_fuzzy_title_fuzzy_contrib(self, release): - """ - Using elasticsearch fuzziness option (which is not that fuzzy). - """ - if release.title is None or release.contribs is None: - return [] contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()] contrib_queries = [{ "match": { @@ -343,7 +115,18 @@ class FuzzyReleaseMatcher: ] + contrib_queries, }, } + if release.release_year is not None: + query["bool"]["must"].append({ + "range": { + "year": { + "gte": release.release_year - self.release_year_padding, + "lte": release.release_year + self.release_year_padding, + "boost": 0.5, + } + } + }) result = [] + self.logger.info(query) resp = self.es.search(index=self.index, body={ "query": query, @@ -353,19 +136,17 @@ class FuzzyReleaseMatcher: if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: - raise ValueError('too many hits: {}'.format(es_compat_hits_total(resp))) + self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) return entities - def match_release_generic(self, release): + def _match_title(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ - Final catch all variant via title. + Match in the presence of a title. """ - if release.title is None: - return [] query = { "bool": { "must": [ @@ -373,7 +154,7 @@ class FuzzyReleaseMatcher: "match": { "title": { "query": release.title, - "operator": "OR", + "operator": "AND", "fuzziness": "AUTO", }, } @@ -381,6 +162,16 @@ class FuzzyReleaseMatcher: ], }, } + if release.release_year is not None: + query["bool"]["must"].append({ + "range": { + "year": { + "gte": release.release_year - self.release_year_padding, + "lte": release.release_year + self.release_year_padding, + "boost": 0.5, + } + } + }) result = [] resp = self.es.search(index=self.index, body={ @@ -391,19 +182,17 @@ class FuzzyReleaseMatcher: if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: - self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp))) + self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) return entities - def match_release_generic_fuzzy_contrib(self, release): + def _match_contribs(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ - Only match contribs, if they exist. + Match in the presence of contribs (and no title). """ - if release.contribs is None: - return [] contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()] contrib_queries = [{ "match": { @@ -417,6 +206,16 @@ class FuzzyReleaseMatcher: "must": contrib_queries, }, } + if release.release_year is not None: + query["bool"]["must"].append({ + "range": { + "year": { + "gte": release.release_year - self.release_year_padding, + "lte": release.release_year + self.release_year_padding, + "boost": 0.5, + } + } + }) result = [] resp = self.es.search(index=self.index, body={ @@ -427,37 +226,78 @@ class FuzzyReleaseMatcher: if es_compat_hits_total(resp) == 0: return result if es_compat_hits_total(resp) > self.size: - self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp))) + self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp))) entities = response_to_entity_list(resp, entity_type=ReleaseEntity, size=self.size, api=self.api) return entities - def match_cascade(self, release, *qs, **kwargs): + def _match_generic(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ - Returns the result from the first query that returns a result. All query - functions need to be defined on this class (for now). + Throw tokens at elasticsearch. """ - for q in qs: - self.logger.debug("[cascade] {}".format(q)) - result = q(release, **kwargs) - if len(result) > 0: - return result - return [] + token_queries = [ + { + "match": { + "biblio": { # https://git.io/JMXvJ + "query": token, + } + } + } for token in release_tokens(release) if len(token) > self.min_token_length + ] + query = { + "bool": { + "must": token_queries, + }, + } + if release.release_year is not None: + query["bool"]["must"].append({ + "range": { + "year": { + "gte": release.release_year - self.release_year_padding, + "lte": release.release_year + self.release_year_padding, + "boost": 0.5, + } + } + }) + result = [] + self.logger.info(query) + resp = self.es.search(index=self.index, + body={ + "query": query, + "size": self.size, + "track_total_hits": True + }) + if es_compat_hits_total(resp) == 0: + return result + if es_compat_hits_total(resp) > self.size: + self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp))) + entities = response_to_entity_list(resp, + entity_type=ReleaseEntity, + size=self.size, + api=self.api) + return entities def match(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]: """ - Match returns a list of match candidates given a release entity. + Match dispatches methods based on which fields are defined on the + document. """ if not release: return [] - return self.match_cascade( - release, self.match_release_by_id, self.match_release_exact_title_exact_contrib, - self.match_release_exact_title_partial_contrib, - self.match_release_exact_title_fuzzy_contrib, self.match_release_exact_title, - self.match_release_fuzzy_title_fuzzy_contrib, self.match_release_generic, - self.match_release_generic_fuzzy_contrib) + if release.ext_ids and len(release.ext_ids.to_dict()) > 0: + result = self._match_id(release) + if release.title is not None and release.contribs is not None: + result = self._match_title_contrib(release) + elif release.title is not None: + result = self._match_title(release) + elif release.contribs is not None: + result = self._match_contribs(release) + else: + result = self._match_generic(release) + + return result def public_api(host_uri): @@ -471,14 +311,97 @@ def public_api(host_uri): return fatcat_openapi_client.DefaultApi(fatcat_openapi_client.ApiClient(conf)) +def release_tokens(release: ReleaseEntity) -> List[str]: + """ + Turn a release into a set of tokens. + """ + tokens = [] + red = release.to_dict() + for k, v in red.items(): + if v is None or k == "ext_ids": + continue + v = str(v) + for tok in v.split(): + tokens.append(tok) + for _, v in red.get("ext_ids", {}).items(): + if v is None or not isinstance(v, str): + continue + for tok in v.split(): + tokens.append(tok) + + return tokens + + +def test_release_tokens(): + Case = collections.namedtuple("Case", "re tokens") + cases = ( + Case(entity_from_dict({"ext_ids": {}}, ReleaseEntity), []), + Case(entity_from_dict({ + "ext_ids": {}, + "title": "Flow my tears" + }, ReleaseEntity), ["Flow", "my", "tears"]), + Case( + entity_from_dict( + { + "ext_ids": {}, + "subtitle": "An illustrated guide", + "release_year": 1981, + }, ReleaseEntity), ["An", "illustrated", "guide", "1981"]), + ) + for c in cases: + tokens = release_tokens(c.re) + assert tokens == c.tokens + + +def fetch_release(ident, api=None): + """ + Return release entity of None. + """ + if api is None: + api = public_api(FATCAT_API_URL) + try: + re = api.get_release(ident, hide="refs,abstracts", expand="container,contribs,files") + except ApiException as exc: + if exc.status == 404: + print("[err] failed to retrieve release entity: {}".format(id), file=sys.stderr) + else: + print("[err] api failed with {}: {}".format(exc.status, exc.message), file=sys.stderr) + else: + return re + + def retrieve_entity_list( ids: List[str], api: DefaultApi = None, entity_type: Union[Type[ReleaseEntity], Type[ContainerEntity]] = ReleaseEntity, +) -> List[Union[Type[ReleaseEntity], Type[ContainerEntity]]]: + """ + Parallel requests. + """ + if api is None: + api = public_api(FATCAT_API_URL) + + result = [] + if entity_type == ReleaseEntity: + with Pool(10) as p: + result = p.map(fetch_release, ids) + return [v for v in result if v is not None] + else: + raise ValueError("[err] cannot retrieve ids {} of type {}".format(ids, entity_type)) + + return result + + +def retrieve_entity_list_sequential( + ids: List[str], + api: DefaultApi = None, + entity_type: Union[Type[ReleaseEntity], Type[ContainerEntity]] = ReleaseEntity, ) -> List[Union[Type[ReleaseEntity], Type[ContainerEntity]]]: """ Retrieve a list of entities. Some entities might be missing. Return all that are accessible. + + TODO: parallelize API access. """ if api is None: api = public_api(FATCAT_API_URL) -- cgit v1.2.3