aboutsummaryrefslogtreecommitdiffstats
path: root/fuzzycat/matching.py
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-11-17 14:51:50 +0100
committerMartin Czygan <martin.czygan@gmail.com>2021-12-06 19:53:30 +0100
commitdd6149140542585f2b0bfc3b334ec2b0a88b790e (patch)
tree6a11c228558cfbf73932bc828cda9be3735cfd78 /fuzzycat/matching.py
parentd104f8d0ba8eef5563555de82be66bbf17f961db (diff)
downloadfuzzycat-dd6149140542585f2b0bfc3b334ec2b0a88b790e.tar.gz
fuzzycat-dd6149140542585f2b0bfc3b334ec2b0a88b790e.zip
complete FuzzyReleaseMatcher refactoring
We keep the name, since the api - "matcher.match(release)" - is the same; simplified queries; at most one query is performed against elasticsearch; parallel release retrieval from the API; optional support for release year windows; Test cases are expressed in yaml and will be auto-loaded from the specified directory; test work against the current search endpoint, which means the actual output may change on index updates; for the moment, we think this setup is relatively simple and not too unstable. about: title contrib, partial name input: > { "contribs": [ { "raw_name": "Adams" } ], "title": "digital libraries", "ext_ids": {} } release_year_padding: 1 expected: - 7rmvqtrb2jdyhcxxodihzzcugy - a2u6ougtsjcbvczou6sazsulcm - dy45vilej5diros6zmax46nm4e - exuwhhayird4fdjmmsiqpponlq - gqrj7jikezgcfpjfazhpf4e7c4 - mkmqt3453relbpuyktnmsg6hjq - t2g5sl3dgzchtnq7dynxyzje44 - t4tvenhrvzamraxrvvxivxmvga - wd3oeoi3bffknfbg2ymleqc4ja - y63a6dhrfnb7bltlxfynydbojy
Diffstat (limited to 'fuzzycat/matching.py')
-rw-r--r--fuzzycat/matching.py479
1 files changed, 201 insertions, 278 deletions
diff --git a/fuzzycat/matching.py b/fuzzycat/matching.py
index 2984d9a..cb6acbb 100644
--- a/fuzzycat/matching.py
+++ b/fuzzycat/matching.py
@@ -1,7 +1,9 @@
+import collections
import logging
import os
import re
import sys
+from multiprocessing.dummy import Pool
from typing import Any, List, Optional, Type, Union
import elasticsearch
@@ -22,37 +24,24 @@ FATCAT_API_URL = settings.get("FATCAT_API_URL", "https://api.fatcat.wiki/v0")
class FuzzyReleaseMatcher:
"""
- FuzzyReleaseMatcher tries to find similar items to a given release in
- elasticsearch. Exact matches first, then fuzzy.
+ This is a helper class to fetch related documents to a given release
+ document from fatcat search (currently elasticsearc)). Elasticsearch should
+ rank similar documents high itself, so all we try to do here is to tweak
+ the specific query a bit, depending on the completeness of the input
+ document, e.g. if the input has contrib and title, then use both, if it
+ only has a title, then use just that, etc.
- In the best case, elasticsearch would automatically rank the most relevant
- docs first, even with partial data. We still try to steer the matches by
- using a query cascade. This is configurable. The last query should be a
- generic.
-
- The goal here is to get a set of potential matches; verification has to.
- happen separately.
-
- TODO:
-
- Example case not yet working well ("Stuehrenberg" vs "Stührenberg"):
-
- >>> result = matcher.match(entity_from_dict({"title": "internet archive",
- "contribs": [{"raw_name":
- "Stührenberg"}],
- "ext_ids": {}},
- ReleaseEntity))
-
- > Should return: https://fatcat.wiki/release/pu7e7tbctna2foqyyxztfw3ufy,
- https://fatcat.wiki/release/search?q=St%C3%BChrenberg+internet+archive&generic=1
- (not returning anything via frontend either)
-
- Make sure we can switch from function to class:
-
- * [ ] 5 test cases for both
+ We try to get the result in a single query.
+ TODO/Tweaks: e.g. if document do have a "release_year", add this as a "should" clause.
"""
- def __init__(self, es="https://search.fatcat.wiki", api=None, index="fatcat_release", size=10):
+ def __init__(self,
+ es="https://search.fatcat.wiki",
+ api=None,
+ index="fatcat_release",
+ size=10,
+ min_token_length=3,
+ release_year_padding=1):
if isinstance(es, str):
self.es = elasticsearch.Elasticsearch([es])
else:
@@ -61,8 +50,10 @@ class FuzzyReleaseMatcher:
self.index = index
self.size = size
self.logger = logging.getLogger("fuzzy")
+ self.min_token_length = min_token_length
+ self.release_year_padding = 1
- def match_release_by_id(self, release, **kwargs) -> List[ReleaseEntity]:
+ def _match_id(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]:
"""
Check for exact matches by identifier.
"""
@@ -97,229 +88,10 @@ class FuzzyReleaseMatcher:
return [r]
return []
- def match_release_exact_title_exact_contrib(self, release):
- """
- Match exact title and exact contrib names. Case insensitive, order of
- contribs does not matter.
- """
- if release.title is None or release.contribs is None:
- return []
- contrib_queries = [{
- "match": {
- "contrib_names": {
- "query": contrib.raw_name,
- "operator": "AND",
- }
- }
- } for contrib in release.contribs]
- query = {
- "bool": {
- "must": [{
- "match": {
- "title": {
- "query": release.title,
- "operator": "AND",
- },
- }
- }] + contrib_queries,
- },
- }
- result = []
-
- resp = self.es.search(index=self.index,
- body={
- "query": query,
- "size": self.size,
- "track_total_hits": True
- })
- hits_total = es_compat_hits_total(resp)
- if hits_total == 0:
- return result
- if hits_total > self.size:
- self.logger.warn('more than {} hits: {}'.format(self.size, hits_total))
-
- entities = response_to_entity_list(resp,
- entity_type=ReleaseEntity,
- size=self.size,
- api=self.api)
-
- # Require overlap of contrib.
- matcher = ContribListMatcher(
- cmp=JaccardIndexThreshold(1.0),
- pipeline=Pipeline([
- lambda contribs: set((c.raw_name.strip().lower() for c in contribs)),
- ]),
- )
-
- for re in entities:
- if re.title.strip().lower() != release.title.strip().lower():
- continue
- if not matcher.compare(re.contribs, release.contribs):
- continue
- result.append(re)
- return result
-
- def match_release_exact_title_partial_contrib(self, release):
- """
- Allow for exact authors, but ok, if some are missing.
- """
- if release.title is None or release.contribs is None:
- return []
- contrib_queries = [{
- "match": {
- "contrib_names": {
- "query": contrib.raw_name,
- "operator": "AND",
- }
- }
- } for contrib in release.contribs]
- query = {
- "bool": {
- "must": [{
- "match": {
- "title": {
- "query": release.title,
- "operator": "AND",
- },
- }
- }] + contrib_queries,
- },
- }
- result = []
- resp = self.es.search(index=self.index,
- body={
- "query": query,
- "size": self.size,
- "track_total_hits": True
- })
- if es_compat_hits_total(resp) == 0:
- return result
- if es_compat_hits_total(resp) > self.size:
- raise NotImplementedError('result set too large: {}'.format(es))
- entities = response_to_entity_list(resp,
- entity_type=ReleaseEntity,
- size=self.size,
- api=self.api)
-
- # Require at least half the contribs to be shared.
- matcher = ContribListMatcher(
- cmp=JaccardIndexThreshold(0.5),
- pipeline=Pipeline([
- lambda contribs: set((c.raw_name.strip().lower() for c in contribs)),
- ]),
- )
-
- for re in entities:
- if re.title.strip().lower() != release.title.strip().lower():
- continue
- if not matcher.compare(re.contribs, release.contribs):
- continue
- result.append(re)
- return result
-
- def match_release_exact_title_fuzzy_contrib(self, release):
- """
- Exact title but ok it authors differ (slightly).
- """
- if release.title is None or release.contribs is None:
- return []
- contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()]
- contrib_queries = [{
- "match": {
- "contrib_names": {
- "query": token,
- }
- }
- } for token in contrib_tokens]
- query = {
- "bool": {
- "must": [{
- "match": {
- "title": {
- "query": release.title,
- "operator": "AND",
- },
- }
- }] + contrib_queries,
- },
- }
- result = []
- resp = self.es.search(index=self.index,
- body={
- "query": query,
- "size": self.size,
- "track_total_hits": True
- })
- if es_compat_hits_total(resp) == 0:
- return result
- if es_compat_hits_total(resp) > self.size:
- raise NotImplementedError('todo: scroll required for larger result sets: {}'.format(es))
- entities = response_to_entity_list(resp,
- entity_type=ReleaseEntity,
- size=self.size,
- api=self.api)
-
- matcher = ContribListMatcher(
- cmp=FuzzyStringSimilarity(min_ratio=60),
- pipeline=Pipeline([
- lambda contribs: set((c.raw_name.strip().lower() for c in contribs)),
- ]),
- )
-
- for re in entities:
- if re.title.strip().lower() != release.title.strip().lower():
- continue
- if not matcher.compare(re.contribs, release.contribs):
- continue
- result.append(re)
- return result
-
- def match_release_exact_title(self, release):
+ def _match_title_contrib(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]:
"""
- Exact title, but any author. For common titles, this will yield 100s or
- 1000s or results.
+ Match in the presence of defined title and contrib fields.
"""
- if release.title is None:
- return []
- query = {
- "bool": {
- "must": [{
- "match": {
- "title": {
- "query": release.title,
- "operator": "AND",
- },
- }
- }],
- },
- }
- result = []
- resp = self.es.search(body={
- "query": query,
- "size": self.size,
- "track_total_hits": True
- },
- index=self.index)
- if es_compat_hits_total(resp) == 0:
- return result
- if es_compat_hits_total(resp) > self.size:
- self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp)))
- entities = response_to_entity_list(resp,
- entity_type=ReleaseEntity,
- size=self.size,
- api=self.api)
- for re in entities:
- if re.title.strip().lower() != release.title.strip().lower():
- continue
- result.append(re)
- return result
-
- def match_release_fuzzy_title_fuzzy_contrib(self, release):
- """
- Using elasticsearch fuzziness option (which is not that fuzzy).
- """
- if release.title is None or release.contribs is None:
- return []
contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()]
contrib_queries = [{
"match": {
@@ -343,7 +115,18 @@ class FuzzyReleaseMatcher:
] + contrib_queries,
},
}
+ if release.release_year is not None:
+ query["bool"]["must"].append({
+ "range": {
+ "year": {
+ "gte": release.release_year - self.release_year_padding,
+ "lte": release.release_year + self.release_year_padding,
+ "boost": 0.5,
+ }
+ }
+ })
result = []
+ self.logger.info(query)
resp = self.es.search(index=self.index,
body={
"query": query,
@@ -353,19 +136,17 @@ class FuzzyReleaseMatcher:
if es_compat_hits_total(resp) == 0:
return result
if es_compat_hits_total(resp) > self.size:
- raise ValueError('too many hits: {}'.format(es_compat_hits_total(resp)))
+ self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp)))
entities = response_to_entity_list(resp,
entity_type=ReleaseEntity,
size=self.size,
api=self.api)
return entities
- def match_release_generic(self, release):
+ def _match_title(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]:
"""
- Final catch all variant via title.
+ Match in the presence of a title.
"""
- if release.title is None:
- return []
query = {
"bool": {
"must": [
@@ -373,7 +154,7 @@ class FuzzyReleaseMatcher:
"match": {
"title": {
"query": release.title,
- "operator": "OR",
+ "operator": "AND",
"fuzziness": "AUTO",
},
}
@@ -381,6 +162,16 @@ class FuzzyReleaseMatcher:
],
},
}
+ if release.release_year is not None:
+ query["bool"]["must"].append({
+ "range": {
+ "year": {
+ "gte": release.release_year - self.release_year_padding,
+ "lte": release.release_year + self.release_year_padding,
+ "boost": 0.5,
+ }
+ }
+ })
result = []
resp = self.es.search(index=self.index,
body={
@@ -391,19 +182,17 @@ class FuzzyReleaseMatcher:
if es_compat_hits_total(resp) == 0:
return result
if es_compat_hits_total(resp) > self.size:
- self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp)))
+ self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp)))
entities = response_to_entity_list(resp,
entity_type=ReleaseEntity,
size=self.size,
api=self.api)
return entities
- def match_release_generic_fuzzy_contrib(self, release):
+ def _match_contribs(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]:
"""
- Only match contribs, if they exist.
+ Match in the presence of contribs (and no title).
"""
- if release.contribs is None:
- return []
contrib_tokens = [tok for c in release.contribs for tok in c.raw_name.split()]
contrib_queries = [{
"match": {
@@ -417,6 +206,16 @@ class FuzzyReleaseMatcher:
"must": contrib_queries,
},
}
+ if release.release_year is not None:
+ query["bool"]["must"].append({
+ "range": {
+ "year": {
+ "gte": release.release_year - self.release_year_padding,
+ "lte": release.release_year + self.release_year_padding,
+ "boost": 0.5,
+ }
+ }
+ })
result = []
resp = self.es.search(index=self.index,
body={
@@ -427,37 +226,78 @@ class FuzzyReleaseMatcher:
if es_compat_hits_total(resp) == 0:
return result
if es_compat_hits_total(resp) > self.size:
- self.logger.warn('too many hits: {}'.format(es_compat_hits_total(resp)))
+ self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp)))
entities = response_to_entity_list(resp,
entity_type=ReleaseEntity,
size=self.size,
api=self.api)
return entities
- def match_cascade(self, release, *qs, **kwargs):
+ def _match_generic(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]:
"""
- Returns the result from the first query that returns a result. All query
- functions need to be defined on this class (for now).
+ Throw tokens at elasticsearch.
"""
- for q in qs:
- self.logger.debug("[cascade] {}".format(q))
- result = q(release, **kwargs)
- if len(result) > 0:
- return result
- return []
+ token_queries = [
+ {
+ "match": {
+ "biblio": { # https://git.io/JMXvJ
+ "query": token,
+ }
+ }
+ } for token in release_tokens(release) if len(token) > self.min_token_length
+ ]
+ query = {
+ "bool": {
+ "must": token_queries,
+ },
+ }
+ if release.release_year is not None:
+ query["bool"]["must"].append({
+ "range": {
+ "year": {
+ "gte": release.release_year - self.release_year_padding,
+ "lte": release.release_year + self.release_year_padding,
+ "boost": 0.5,
+ }
+ }
+ })
+ result = []
+ self.logger.info(query)
+ resp = self.es.search(index=self.index,
+ body={
+ "query": query,
+ "size": self.size,
+ "track_total_hits": True
+ })
+ if es_compat_hits_total(resp) == 0:
+ return result
+ if es_compat_hits_total(resp) > self.size:
+ self.logger.warning('too many hits: {}'.format(es_compat_hits_total(resp)))
+ entities = response_to_entity_list(resp,
+ entity_type=ReleaseEntity,
+ size=self.size,
+ api=self.api)
+ return entities
def match(self, release: Optional[ReleaseEntity]) -> List[ReleaseEntity]:
"""
- Match returns a list of match candidates given a release entity.
+ Match dispatches methods based on which fields are defined on the
+ document.
"""
if not release:
return []
- return self.match_cascade(
- release, self.match_release_by_id, self.match_release_exact_title_exact_contrib,
- self.match_release_exact_title_partial_contrib,
- self.match_release_exact_title_fuzzy_contrib, self.match_release_exact_title,
- self.match_release_fuzzy_title_fuzzy_contrib, self.match_release_generic,
- self.match_release_generic_fuzzy_contrib)
+ if release.ext_ids and len(release.ext_ids.to_dict()) > 0:
+ result = self._match_id(release)
+ if release.title is not None and release.contribs is not None:
+ result = self._match_title_contrib(release)
+ elif release.title is not None:
+ result = self._match_title(release)
+ elif release.contribs is not None:
+ result = self._match_contribs(release)
+ else:
+ result = self._match_generic(release)
+
+ return result
def public_api(host_uri):
@@ -471,14 +311,97 @@ def public_api(host_uri):
return fatcat_openapi_client.DefaultApi(fatcat_openapi_client.ApiClient(conf))
+def release_tokens(release: ReleaseEntity) -> List[str]:
+ """
+ Turn a release into a set of tokens.
+ """
+ tokens = []
+ red = release.to_dict()
+ for k, v in red.items():
+ if v is None or k == "ext_ids":
+ continue
+ v = str(v)
+ for tok in v.split():
+ tokens.append(tok)
+ for _, v in red.get("ext_ids", {}).items():
+ if v is None or not isinstance(v, str):
+ continue
+ for tok in v.split():
+ tokens.append(tok)
+
+ return tokens
+
+
+def test_release_tokens():
+ Case = collections.namedtuple("Case", "re tokens")
+ cases = (
+ Case(entity_from_dict({"ext_ids": {}}, ReleaseEntity), []),
+ Case(entity_from_dict({
+ "ext_ids": {},
+ "title": "Flow my tears"
+ }, ReleaseEntity), ["Flow", "my", "tears"]),
+ Case(
+ entity_from_dict(
+ {
+ "ext_ids": {},
+ "subtitle": "An illustrated guide",
+ "release_year": 1981,
+ }, ReleaseEntity), ["An", "illustrated", "guide", "1981"]),
+ )
+ for c in cases:
+ tokens = release_tokens(c.re)
+ assert tokens == c.tokens
+
+
+def fetch_release(ident, api=None):
+ """
+ Return release entity of None.
+ """
+ if api is None:
+ api = public_api(FATCAT_API_URL)
+ try:
+ re = api.get_release(ident, hide="refs,abstracts", expand="container,contribs,files")
+ except ApiException as exc:
+ if exc.status == 404:
+ print("[err] failed to retrieve release entity: {}".format(id), file=sys.stderr)
+ else:
+ print("[err] api failed with {}: {}".format(exc.status, exc.message), file=sys.stderr)
+ else:
+ return re
+
+
def retrieve_entity_list(
ids: List[str],
api: DefaultApi = None,
entity_type: Union[Type[ReleaseEntity], Type[ContainerEntity]] = ReleaseEntity,
) -> List[Union[Type[ReleaseEntity], Type[ContainerEntity]]]:
"""
+ Parallel requests.
+ """
+ if api is None:
+ api = public_api(FATCAT_API_URL)
+
+ result = []
+ if entity_type == ReleaseEntity:
+ with Pool(10) as p:
+ result = p.map(fetch_release, ids)
+ return [v for v in result if v is not None]
+ else:
+ raise ValueError("[err] cannot retrieve ids {} of type {}".format(ids, entity_type))
+
+ return result
+
+
+def retrieve_entity_list_sequential(
+ ids: List[str],
+ api: DefaultApi = None,
+ entity_type: Union[Type[ReleaseEntity], Type[ContainerEntity]] = ReleaseEntity,
+) -> List[Union[Type[ReleaseEntity], Type[ContainerEntity]]]:
+ """
Retrieve a list of entities. Some entities might be missing. Return all
that are accessible.
+
+ TODO: parallelize API access.
"""
if api is None:
api = public_api(FATCAT_API_URL)