diff options
Diffstat (limited to 'python/fatcat_web/search.py')
-rw-r--r-- | python/fatcat_web/search.py | 343 |
1 files changed, 161 insertions, 182 deletions
diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index 8cbe09f6..913d6696 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -4,31 +4,22 @@ the formal API) """ import datetime -import sys from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple import elasticsearch -import elasticsearch_dsl.response from elasticsearch_dsl import Q, Search +from fatcat_tools.search.common import ( + _hits_total_int, + agg_to_dict, + results_to_dict, + wrap_es_execution, +) +from fatcat_tools.search.stats import query_es_container_stats from fatcat_web import app -class FatcatSearchError(Exception): - def __init__(self, status_code: Union[int, str], name: str, description: str = None): - if status_code == "TIMEOUT": - status_code = 504 - elif isinstance(status_code, str): - try: - status_code = int(status_code) - except ValueError: - status_code = 503 - self.status_code = status_code - self.name = name - self.description = description - - @dataclass class ReleaseQuery: q: Optional[str] = None @@ -38,22 +29,13 @@ class ReleaseQuery: container_id: Optional[str] = None recent: bool = False exclude_stubs: bool = False + sort: Optional[List[str]] = None @staticmethod def from_args(args: Dict[str, Any]) -> "ReleaseQuery": query_str = args.get("q") or "*" - container_id = args.get("container_id") - # TODO: as filter, not in query string - if container_id: - query_str += ' container_id:"{}"'.format(container_id) - - # TODO: where are container_issnl queries actually used? - issnl = args.get("container_issnl") - if issnl and query_str: - query_str += ' container_issnl:"{}"'.format(issnl) - offset = args.get("offset", "0") offset = max(0, int(offset)) if offset.isnumeric() else 0 @@ -61,9 +43,10 @@ class ReleaseQuery: q=query_str, offset=offset, fulltext_only=bool(args.get("fulltext_only")), - container_id=container_id, + container_id=args.get("container_id"), recent=bool(args.get("recent")), exclude_stubs=bool(args.get("exclude_stubs")), + sort=None, ) @@ -98,87 +81,11 @@ class SearchHits: results: List[Any] -def _hits_total_int(val: Any) -> int: - """ - Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as - an int in many places, in ES 7 as a dict (JSON object) with 'value' key - """ - if isinstance(val, int): - return val - else: - return int(val["value"]) - - -def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]: - """ - Takes a response returns all the hits as JSON objects. - - Also handles surrogate strings that elasticsearch returns sometimes, - probably due to mangled data processing in some pipeline. "Crimes against - Unicode"; production workaround - """ - - results = [] - for h in response: - r = h._d_ - # print(h.meta._d_) - results.append(r) - - for h in results: - for key in h: - if type(h[key]) is str: - h[key] = h[key].encode("utf8", "ignore").decode("utf8") - return results - - -def wrap_es_execution(search: Search) -> Any: - """ - Executes a Search object, and converts various ES error types into - something we can pretty print to the user. - """ - try: - resp = search.execute() - except elasticsearch.exceptions.RequestError as e: - # this is a "user" error - print("elasticsearch 400: " + str(e.info), file=sys.stderr) - description = None - assert isinstance(e.info, dict) - if e.info.get("error", {}).get("root_cause", {}): - description = str(e.info["error"]["root_cause"][0].get("reason")) - raise FatcatSearchError(e.status_code, str(e.error), description) - except elasticsearch.exceptions.ConnectionError as e: - raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available") - except elasticsearch.exceptions.TransportError as e: - # all other errors - print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr) - description = None - assert isinstance(e.info, dict) - if e.info and e.info.get("error", {}).get("root_cause", {}): - description = str(e.info["error"]["root_cause"][0].get("reason")) - raise FatcatSearchError(e.status_code, str(e.error), description) - return resp - - -def agg_to_dict(agg: Any) -> Dict[str, Any]: - """ - Takes a simple term aggregation result (with buckets) and returns a simple - dict with keys as terms and counts as values. Includes an extra value - '_other', and by convention aggregations should be written to have "missing" - values as '_unknown'. - """ - result = dict() - for bucket in agg.buckets: - result[bucket.key] = bucket.doc_count - if agg.sum_other_doc_count: - result["_other"] = agg.sum_other_doc_count - return result - - def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> SearchHits: search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_CONTAINER_INDEX"]) - search = search.query( + basic_query = Q( "query_string", query=query.q, default_operator="AND", @@ -188,8 +95,22 @@ def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> Sea fields=["biblio"], ) + search = search.query( + "boosting", + positive=Q( + "bool", + must=basic_query, + should=[ + Q("range", releases_total={"gte": 500}), + Q("range", releases_total={"gte": 5000}), + ], + ), + negative=Q("term", releases_total=0), + negative_boost=0.5, + ) + # Sanity checks - limit = min((int(query.limit or 25), 100)) + limit = min((int(query.limit or 25), 300)) offset = max((int(query.offset or 0), 0)) if offset > deep_page_limit: # Avoid deep paging problem. @@ -249,6 +170,9 @@ def do_release_search(query: ReleaseQuery, deep_page_limit: int = 2000) -> Searc ], ) + if query.container_id: + search = search.filter("term", container_id=query.container_id) + search = search.query( "boosting", positive=Q( @@ -260,8 +184,11 @@ def do_release_search(query: ReleaseQuery, deep_page_limit: int = 2000) -> Searc negative_boost=0.5, ) + if query.sort: + search = search.sort(*query.sort) + # Sanity checks - limit = min((int(query.limit or 25), 100)) + limit = min((int(query.limit or 25), 300)) offset = max((int(query.offset or 0), 0)) if offset > deep_page_limit: # Avoid deep paging problem. @@ -320,6 +247,122 @@ def get_elastic_container_random_releases(ident: str, limit: int = 5) -> List[Di return results +def _sort_vol_key(val: Optional[Any]) -> Tuple[bool, bool, int, str]: + """ + Helper for sorting volume and issue strings. Defined order is: + + - None values first + - any non-integers next, in non-integer order + - any integers next, in integer sorted order (ascending) + + Note that the actual sort used/displayed is reversed. + + TODO: 'val' should actually be Optional[str], but getting a mypy error I + don't know how to hack around quickly right now. + """ + if val is None: + return (False, False, 0, "") + if val.isdigit(): + return (True, True, int(val), "") + else: + return (True, False, 0, val) + + +def get_elastic_container_browse_year_volume_issue(ident: str) -> List[Dict[str, Any]]: + """ + Returns a set of histogram buckets, as nested dicts/lists: + + [ + { year: int, + volumes: [ + { volume: str|None + issues: [ + { issue: str|None + count: int + } + ] } + ] } + ] + """ + + search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_RELEASE_INDEX"]) + search = search.query( + "bool", + filter=[Q("bool", must_not=[Q("match", release_type="stub")])], + ) + search = search.filter("term", container_id=ident) + search.aggs.bucket( + "year_volume", + "composite", + size=1500, + sources=[ + { + "year": { + "histogram": { + "field": "release_year", + "interval": 1, + "missing_bucket": True, + }, + } + }, + { + "volume": { + "terms": { + "field": "volume", + "missing_bucket": True, + }, + } + }, + { + "issue": { + "terms": { + "field": "issue", + "missing_bucket": True, + }, + } + }, + ], + ) + search = search[:0] + search = search.params(request_cache=True) + resp = wrap_es_execution(search) + buckets = resp.aggregations.year_volume.buckets + # print(buckets) + buckets = [h for h in buckets if h["key"]["year"]] + year_nums = set([int(h["key"]["year"]) for h in buckets]) + year_dicts: Dict[int, Dict[str, Any]] = dict() + if year_nums: + for year in year_nums: + year_dicts[year] = {} + for row in buckets: + year = int(row["key"]["year"]) + volume = row["key"]["volume"] or "" + issue = row["key"]["issue"] or "" + if volume not in year_dicts[year]: + year_dicts[year][volume] = {} + year_dicts[year][volume][issue] = int(row["doc_count"]) + + # transform to lists-of-dicts + year_list = [] + for year in year_dicts.keys(): + volume_list = [] + for volume in year_dicts[year].keys(): + issue_list = [] + for issue in year_dicts[year][volume].keys(): + issue_list.append( + dict(issue=issue or None, count=year_dicts[year][volume][issue]) + ) + issue_list = sorted( + issue_list, key=lambda x: _sort_vol_key(x["issue"]), reverse=True + ) + volume_list.append(dict(volume=volume or None, issues=issue_list)) + volume_list = sorted( + volume_list, key=lambda x: _sort_vol_key(x["volume"]), reverse=True + ) + year_list.append(dict(year=year, volumes=volume_list)) + return sorted(year_list, key=lambda x: x["year"], reverse=True) + + def get_elastic_entity_stats() -> dict: """ TODO: files, filesets, webcaptures (no schema yet) @@ -465,6 +508,9 @@ def get_elastic_container_stats( merge_shadows: Optional[bool] = None, ) -> Dict[str, Any]: """ + This is a DEPRECATED backwards-compatability wrapper around the new + query_es_container_stats() method from fatcat_tools. + Returns dict: ident issnl (optional) @@ -485,66 +531,13 @@ def get_elastic_container_stats( if merge_shadows is None: merge_shadows = app.config["FATCAT_MERGE_SHADOW_PRESERVATION"] - search = Search(using=es_client, index=es_index) - search = search.query( - "term", - container_id=ident, - ) - search.aggs.bucket( - "container_stats", - "filters", - filters={ - "in_web": { - "term": {"in_web": True}, - }, - "in_kbart": { - "term": {"in_kbart": True}, - }, - "is_preserved": { - "term": {"is_preserved": True}, - }, - }, + stats = query_es_container_stats( + ident=ident, + es_client=es_client, + es_index=es_index, + merge_shadows=merge_shadows, ) - search.aggs.bucket( - "preservation", - "terms", - field="preservation", - missing="_unknown", - ) - search.aggs.bucket( - "release_type", - "terms", - field="release_type", - missing="_unknown", - ) - - search = search[:0] - - search = search.params(request_cache=True) - search = search.params(track_total_hits=True) - resp = wrap_es_execution(search) - - container_stats = resp.aggregations.container_stats.buckets - preservation_bucket = agg_to_dict(resp.aggregations.preservation) - preservation_bucket["total"] = _hits_total_int(resp.hits.total) - for k in ("bright", "dark", "shadows_only", "none"): - if k not in preservation_bucket: - preservation_bucket[k] = 0 - if merge_shadows: - preservation_bucket["none"] += preservation_bucket["shadows_only"] - preservation_bucket["shadows_only"] = 0 - release_type_bucket = agg_to_dict(resp.aggregations.release_type) - stats = { - "ident": ident, - "issnl": issnl, - "total": _hits_total_int(resp.hits.total), - "in_web": container_stats["in_web"]["doc_count"], - "in_kbart": container_stats["in_kbart"]["doc_count"], - "is_preserved": container_stats["is_preserved"]["doc_count"], - "preservation": preservation_bucket, - "release_type": release_type_bucket, - } - + stats["issnl"] = issnl return stats @@ -643,11 +636,7 @@ def get_elastic_preservation_by_year(query: ReleaseQuery) -> List[Dict[str, Any] "biblio", ], ) - if query.container_id: - search = search.filter( - "term", - container_id=query.container_id, - ) + search = search.filter("term", container_id=query.container_id) if query.exclude_stubs: search = search.query( "bool", @@ -909,17 +898,7 @@ def get_elastic_preservation_by_type(query: ReleaseQuery) -> List[dict]: ], ) if query.container_id: - search = search.query( - "bool", - filter=[ - Q( - "bool", - must=[ - Q("match", container_id=query.container_id), - ], - ), - ], - ) + search = search.filter("term", container_id=query.container_id) if query.recent: date_today = datetime.date.today() start_date = str(date_today - datetime.timedelta(days=60)) |