From ef7d24cd08b98d58779335acd1d655bd342cd9ec Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 9 Feb 2022 18:03:23 -0800 Subject: move container_status ES query code from fatcat_web to fatcat_tools The main motivation is to never have fatcat_tools import from fatcat_web, only vica-versa. Some code in fatcat_tools needs container stats, so starting with that code path (plus some generic helpers). --- python/fatcat_web/routes.py | 2 +- python/fatcat_web/search.py | 169 +++++--------------------------------------- 2 files changed, 18 insertions(+), 153 deletions(-) (limited to 'python/fatcat_web') diff --git a/python/fatcat_web/routes.py b/python/fatcat_web/routes.py index 9f46c674..0afc189f 100644 --- a/python/fatcat_web/routes.py +++ b/python/fatcat_web/routes.py @@ -30,6 +30,7 @@ from fatcat_tools.normal import ( clean_sha1, clean_sha256, ) +from fatcat_tools.search.common import FatcatSearchError from fatcat_tools.transforms import citeproc_csl, release_to_csl from fatcat_web import AnyResponse, Config, api, app, auth_api, mwoauth, priv_api from fatcat_web.auth import ( @@ -55,7 +56,6 @@ from fatcat_web.graphics import ( ) from fatcat_web.kafka import kafka_pixy_produce from fatcat_web.search import ( - FatcatSearchError, GenericQuery, ReleaseQuery, do_container_search, diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index ac4dc34e..99c8ee77 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -4,31 +4,22 @@ the formal API) """ import datetime -import sys from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple import elasticsearch -import elasticsearch_dsl.response from elasticsearch_dsl import Q, Search +from fatcat_tools.search.common import ( + _hits_total_int, + agg_to_dict, + results_to_dict, + wrap_es_execution, +) +from fatcat_tools.search.stats import query_es_container_stats from fatcat_web import app -class FatcatSearchError(Exception): - def __init__(self, status_code: Union[int, str], name: str, description: str = None): - if status_code == "TIMEOUT": - status_code = 504 - elif isinstance(status_code, str): - try: - status_code = int(status_code) - except ValueError: - status_code = 503 - self.status_code = status_code - self.name = name - self.description = description - - @dataclass class ReleaseQuery: q: Optional[str] = None @@ -88,82 +79,6 @@ class SearchHits: results: List[Any] -def _hits_total_int(val: Any) -> int: - """ - Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as - an int in many places, in ES 7 as a dict (JSON object) with 'value' key - """ - if isinstance(val, int): - return val - else: - return int(val["value"]) - - -def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]: - """ - Takes a response returns all the hits as JSON objects. - - Also handles surrogate strings that elasticsearch returns sometimes, - probably due to mangled data processing in some pipeline. "Crimes against - Unicode"; production workaround - """ - - results = [] - for h in response: - r = h._d_ - # print(h.meta._d_) - results.append(r) - - for h in results: - for key in h: - if type(h[key]) is str: - h[key] = h[key].encode("utf8", "ignore").decode("utf8") - return results - - -def wrap_es_execution(search: Search) -> Any: - """ - Executes a Search object, and converts various ES error types into - something we can pretty print to the user. - """ - try: - resp = search.execute() - except elasticsearch.exceptions.RequestError as e: - # this is a "user" error - print("elasticsearch 400: " + str(e.info), file=sys.stderr) - description = None - assert isinstance(e.info, dict) - if e.info.get("error", {}).get("root_cause", {}): - description = str(e.info["error"]["root_cause"][0].get("reason")) - raise FatcatSearchError(e.status_code, str(e.error), description) - except elasticsearch.exceptions.ConnectionError as e: - raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available") - except elasticsearch.exceptions.TransportError as e: - # all other errors - print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr) - description = None - assert isinstance(e.info, dict) - if e.info and e.info.get("error", {}).get("root_cause", {}): - description = str(e.info["error"]["root_cause"][0].get("reason")) - raise FatcatSearchError(e.status_code, str(e.error), description) - return resp - - -def agg_to_dict(agg: Any) -> Dict[str, Any]: - """ - Takes a simple term aggregation result (with buckets) and returns a simple - dict with keys as terms and counts as values. Includes an extra value - '_other', and by convention aggregations should be written to have "missing" - values as '_unknown'. - """ - result = dict() - for bucket in agg.buckets: - result[bucket.key] = bucket.doc_count - if agg.sum_other_doc_count: - result["_other"] = agg.sum_other_doc_count - return result - - def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> SearchHits: search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_CONTAINER_INDEX"]) @@ -536,6 +451,9 @@ def get_elastic_container_stats( merge_shadows: Optional[bool] = None, ) -> Dict[str, Any]: """ + This is a DEPRECATED backwards-compatability wrapper around the new + query_es_container_stats() method from fatcat_tools. + Returns dict: ident issnl (optional) @@ -556,66 +474,13 @@ def get_elastic_container_stats( if merge_shadows is None: merge_shadows = app.config["FATCAT_MERGE_SHADOW_PRESERVATION"] - search = Search(using=es_client, index=es_index) - search = search.query( - "term", - container_id=ident, - ) - search.aggs.bucket( - "container_stats", - "filters", - filters={ - "in_web": { - "term": {"in_web": True}, - }, - "in_kbart": { - "term": {"in_kbart": True}, - }, - "is_preserved": { - "term": {"is_preserved": True}, - }, - }, - ) - search.aggs.bucket( - "preservation", - "terms", - field="preservation", - missing="_unknown", + stats = query_es_container_stats( + ident=ident, + es_client=es_client, + es_index=es_index, + merge_shadows=merge_shadows, ) - search.aggs.bucket( - "release_type", - "terms", - field="release_type", - missing="_unknown", - ) - - search = search[:0] - - search = search.params(request_cache=True) - search = search.params(track_total_hits=True) - resp = wrap_es_execution(search) - - container_stats = resp.aggregations.container_stats.buckets - preservation_bucket = agg_to_dict(resp.aggregations.preservation) - preservation_bucket["total"] = _hits_total_int(resp.hits.total) - for k in ("bright", "dark", "shadows_only", "none"): - if k not in preservation_bucket: - preservation_bucket[k] = 0 - if merge_shadows: - preservation_bucket["none"] += preservation_bucket["shadows_only"] - preservation_bucket["shadows_only"] = 0 - release_type_bucket = agg_to_dict(resp.aggregations.release_type) - stats = { - "ident": ident, - "issnl": issnl, - "total": _hits_total_int(resp.hits.total), - "in_web": container_stats["in_web"]["doc_count"], - "in_kbart": container_stats["in_kbart"]["doc_count"], - "is_preserved": container_stats["is_preserved"]["doc_count"], - "preservation": preservation_bucket, - "release_type": release_type_bucket, - } - + stats["issnl"] = issnl return stats -- cgit v1.2.3