From ef7d24cd08b98d58779335acd1d655bd342cd9ec Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 9 Feb 2022 18:03:23 -0800 Subject: move container_status ES query code from fatcat_web to fatcat_tools The main motivation is to never have fatcat_tools import from fatcat_web, only vica-versa. Some code in fatcat_tools needs container stats, so starting with that code path (plus some generic helpers). --- python/fatcat_tools/search/common.py | 96 +++++++++++++++ python/fatcat_tools/search/stats.py | 87 ++++++++++++++ python/fatcat_tools/workers/elasticsearch.py | 4 +- python/fatcat_transform.py | 4 +- python/fatcat_web/routes.py | 2 +- python/fatcat_web/search.py | 169 +++------------------------ 6 files changed, 205 insertions(+), 157 deletions(-) create mode 100644 python/fatcat_tools/search/common.py create mode 100644 python/fatcat_tools/search/stats.py (limited to 'python') diff --git a/python/fatcat_tools/search/common.py b/python/fatcat_tools/search/common.py new file mode 100644 index 00000000..584757fd --- /dev/null +++ b/python/fatcat_tools/search/common.py @@ -0,0 +1,96 @@ +import sys +from typing import Any, Dict, List, Union + +import elasticsearch +import elasticsearch_dsl.response +from elasticsearch_dsl import Search + + +class FatcatSearchError(Exception): + def __init__(self, status_code: Union[int, str], name: str, description: str = None): + if status_code == "TIMEOUT": + status_code = 504 + elif isinstance(status_code, str): + try: + status_code = int(status_code) + except ValueError: + status_code = 503 + self.status_code = status_code + self.name = name + self.description = description + + +def _hits_total_int(val: Any) -> int: + """ + Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as + an int in many places, in ES 7 as a dict (JSON object) with 'value' key + """ + if isinstance(val, int): + return val + else: + return int(val["value"]) + + +def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]: + """ + Takes a response returns all the hits as JSON objects. + + Also handles surrogate strings that elasticsearch returns sometimes, + probably due to mangled data processing in some pipeline. "Crimes against + Unicode"; production workaround + """ + + results = [] + for h in response: + r = h._d_ + # print(h.meta._d_) + results.append(r) + + for h in results: + for key in h: + if type(h[key]) is str: + h[key] = h[key].encode("utf8", "ignore").decode("utf8") + return results + + +def wrap_es_execution(search: Search) -> Any: + """ + Executes a Search object, and converts various ES error types into + something we can pretty print to the user. + """ + try: + resp = search.execute() + except elasticsearch.exceptions.RequestError as e: + # this is a "user" error + print("elasticsearch 400: " + str(e.info), file=sys.stderr) + description = None + assert isinstance(e.info, dict) + if e.info.get("error", {}).get("root_cause", {}): + description = str(e.info["error"]["root_cause"][0].get("reason")) + raise FatcatSearchError(e.status_code, str(e.error), description) + except elasticsearch.exceptions.ConnectionError as e: + raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available") + except elasticsearch.exceptions.TransportError as e: + # all other errors + print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr) + description = None + assert isinstance(e.info, dict) + if e.info and e.info.get("error", {}).get("root_cause", {}): + description = str(e.info["error"]["root_cause"][0].get("reason")) + raise FatcatSearchError(e.status_code, str(e.error), description) + return resp + + +def agg_to_dict(agg: Any) -> Dict[str, Any]: + """ + Takes a simple term aggregation result (with buckets) and returns a simple + dict with keys as terms and counts as values. Includes an extra value + '_other', and by convention aggregations should be written to have "missing" + values as '_unknown'. + """ + result = dict() + for bucket in agg.buckets: + result[bucket.key] = bucket.doc_count + if agg.sum_other_doc_count: + result["_other"] = agg.sum_other_doc_count + return result diff --git a/python/fatcat_tools/search/stats.py b/python/fatcat_tools/search/stats.py new file mode 100644 index 00000000..5496b94a --- /dev/null +++ b/python/fatcat_tools/search/stats.py @@ -0,0 +1,87 @@ +from typing import Any, Dict + +import elasticsearch +from elasticsearch_dsl import Search + +from fatcat_tools.search.common import _hits_total_int, agg_to_dict, wrap_es_execution + + +def query_es_container_stats( + ident: str, + es_client: elasticsearch.Elasticsearch, + es_index: str = "fatcat_release", + merge_shadows: bool = False, +) -> Dict[str, Any]: + """ + Returns dict: + ident + total: count + in_web: count + in_kbart: count + is_preserved: count + preservation{} + "histogram" by preservation status + release_type{} + "histogram" by release type + """ + + search = Search(using=es_client, index=es_index) + search = search.query( + "term", + container_id=ident, + ) + search.aggs.bucket( + "container_stats", + "filters", + filters={ + "in_web": { + "term": {"in_web": True}, + }, + "in_kbart": { + "term": {"in_kbart": True}, + }, + "is_preserved": { + "term": {"is_preserved": True}, + }, + }, + ) + search.aggs.bucket( + "preservation", + "terms", + field="preservation", + missing="_unknown", + ) + search.aggs.bucket( + "release_type", + "terms", + field="release_type", + missing="_unknown", + ) + + search = search[:0] + + search = search.params(request_cache=True) + search = search.params(track_total_hits=True) + resp = wrap_es_execution(search) + + container_stats = resp.aggregations.container_stats.buckets + preservation_bucket = agg_to_dict(resp.aggregations.preservation) + preservation_bucket["total"] = _hits_total_int(resp.hits.total) + for k in ("bright", "dark", "shadows_only", "none"): + if k not in preservation_bucket: + preservation_bucket[k] = 0 + if merge_shadows: + preservation_bucket["none"] += preservation_bucket["shadows_only"] + preservation_bucket["shadows_only"] = 0 + release_type_bucket = agg_to_dict(resp.aggregations.release_type) + stats = { + "ident": ident, + "total": _hits_total_int(resp.hits.total), + "in_web": container_stats["in_web"]["doc_count"], + "in_kbart": container_stats["in_kbart"]["doc_count"], + "is_preserved": container_stats["is_preserved"]["doc_count"], + "preservation": preservation_bucket, + "release_type": release_type_bucket, + } + + return stats diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index bfadea64..79071810 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -14,13 +14,13 @@ from fatcat_openapi_client import ( ) from fatcat_tools import entity_from_json, public_api +from fatcat_tools.search.stats import query_es_container_stats from fatcat_tools.transforms import ( changelog_to_elasticsearch, container_to_elasticsearch, file_to_elasticsearch, release_to_elasticsearch, ) -from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -156,7 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): continue if self.entity_type == ContainerEntity and self.query_stats: - stats = get_elastic_container_stats( + stats = query_es_container_stats( entity.ident, es_client=es_client, es_index=self.elasticsearch_release_index, diff --git a/python/fatcat_transform.py b/python/fatcat_transform.py index 67bf56c5..4f28951c 100755 --- a/python/fatcat_transform.py +++ b/python/fatcat_transform.py @@ -13,6 +13,7 @@ import elasticsearch from fatcat_openapi_client import ChangelogEntry, ContainerEntity, FileEntity, ReleaseEntity from fatcat_tools import public_api +from fatcat_tools.search.stats import query_es_container_stats from fatcat_tools.transforms import ( changelog_to_elasticsearch, citeproc_csl, @@ -22,7 +23,6 @@ from fatcat_tools.transforms import ( release_to_csl, release_to_elasticsearch, ) -from fatcat_web.search import get_elastic_container_stats def run_elasticsearch_releases(args: argparse.Namespace) -> None: @@ -50,7 +50,7 @@ def run_elasticsearch_containers(args: argparse.Namespace) -> None: if args.query_stats: es_doc = container_to_elasticsearch( entity, - stats=get_elastic_container_stats( + stats=query_es_container_stats( entity.ident, es_client=es_client, es_index=es_release_index, diff --git a/python/fatcat_web/routes.py b/python/fatcat_web/routes.py index 9f46c674..0afc189f 100644 --- a/python/fatcat_web/routes.py +++ b/python/fatcat_web/routes.py @@ -30,6 +30,7 @@ from fatcat_tools.normal import ( clean_sha1, clean_sha256, ) +from fatcat_tools.search.common import FatcatSearchError from fatcat_tools.transforms import citeproc_csl, release_to_csl from fatcat_web import AnyResponse, Config, api, app, auth_api, mwoauth, priv_api from fatcat_web.auth import ( @@ -55,7 +56,6 @@ from fatcat_web.graphics import ( ) from fatcat_web.kafka import kafka_pixy_produce from fatcat_web.search import ( - FatcatSearchError, GenericQuery, ReleaseQuery, do_container_search, diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index ac4dc34e..99c8ee77 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -4,31 +4,22 @@ the formal API) """ import datetime -import sys from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple import elasticsearch -import elasticsearch_dsl.response from elasticsearch_dsl import Q, Search +from fatcat_tools.search.common import ( + _hits_total_int, + agg_to_dict, + results_to_dict, + wrap_es_execution, +) +from fatcat_tools.search.stats import query_es_container_stats from fatcat_web import app -class FatcatSearchError(Exception): - def __init__(self, status_code: Union[int, str], name: str, description: str = None): - if status_code == "TIMEOUT": - status_code = 504 - elif isinstance(status_code, str): - try: - status_code = int(status_code) - except ValueError: - status_code = 503 - self.status_code = status_code - self.name = name - self.description = description - - @dataclass class ReleaseQuery: q: Optional[str] = None @@ -88,82 +79,6 @@ class SearchHits: results: List[Any] -def _hits_total_int(val: Any) -> int: - """ - Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as - an int in many places, in ES 7 as a dict (JSON object) with 'value' key - """ - if isinstance(val, int): - return val - else: - return int(val["value"]) - - -def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]: - """ - Takes a response returns all the hits as JSON objects. - - Also handles surrogate strings that elasticsearch returns sometimes, - probably due to mangled data processing in some pipeline. "Crimes against - Unicode"; production workaround - """ - - results = [] - for h in response: - r = h._d_ - # print(h.meta._d_) - results.append(r) - - for h in results: - for key in h: - if type(h[key]) is str: - h[key] = h[key].encode("utf8", "ignore").decode("utf8") - return results - - -def wrap_es_execution(search: Search) -> Any: - """ - Executes a Search object, and converts various ES error types into - something we can pretty print to the user. - """ - try: - resp = search.execute() - except elasticsearch.exceptions.RequestError as e: - # this is a "user" error - print("elasticsearch 400: " + str(e.info), file=sys.stderr) - description = None - assert isinstance(e.info, dict) - if e.info.get("error", {}).get("root_cause", {}): - description = str(e.info["error"]["root_cause"][0].get("reason")) - raise FatcatSearchError(e.status_code, str(e.error), description) - except elasticsearch.exceptions.ConnectionError as e: - raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available") - except elasticsearch.exceptions.TransportError as e: - # all other errors - print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr) - description = None - assert isinstance(e.info, dict) - if e.info and e.info.get("error", {}).get("root_cause", {}): - description = str(e.info["error"]["root_cause"][0].get("reason")) - raise FatcatSearchError(e.status_code, str(e.error), description) - return resp - - -def agg_to_dict(agg: Any) -> Dict[str, Any]: - """ - Takes a simple term aggregation result (with buckets) and returns a simple - dict with keys as terms and counts as values. Includes an extra value - '_other', and by convention aggregations should be written to have "missing" - values as '_unknown'. - """ - result = dict() - for bucket in agg.buckets: - result[bucket.key] = bucket.doc_count - if agg.sum_other_doc_count: - result["_other"] = agg.sum_other_doc_count - return result - - def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> SearchHits: search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_CONTAINER_INDEX"]) @@ -536,6 +451,9 @@ def get_elastic_container_stats( merge_shadows: Optional[bool] = None, ) -> Dict[str, Any]: """ + This is a DEPRECATED backwards-compatability wrapper around the new + query_es_container_stats() method from fatcat_tools. + Returns dict: ident issnl (optional) @@ -556,66 +474,13 @@ def get_elastic_container_stats( if merge_shadows is None: merge_shadows = app.config["FATCAT_MERGE_SHADOW_PRESERVATION"] - search = Search(using=es_client, index=es_index) - search = search.query( - "term", - container_id=ident, - ) - search.aggs.bucket( - "container_stats", - "filters", - filters={ - "in_web": { - "term": {"in_web": True}, - }, - "in_kbart": { - "term": {"in_kbart": True}, - }, - "is_preserved": { - "term": {"is_preserved": True}, - }, - }, - ) - search.aggs.bucket( - "preservation", - "terms", - field="preservation", - missing="_unknown", + stats = query_es_container_stats( + ident=ident, + es_client=es_client, + es_index=es_index, + merge_shadows=merge_shadows, ) - search.aggs.bucket( - "release_type", - "terms", - field="release_type", - missing="_unknown", - ) - - search = search[:0] - - search = search.params(request_cache=True) - search = search.params(track_total_hits=True) - resp = wrap_es_execution(search) - - container_stats = resp.aggregations.container_stats.buckets - preservation_bucket = agg_to_dict(resp.aggregations.preservation) - preservation_bucket["total"] = _hits_total_int(resp.hits.total) - for k in ("bright", "dark", "shadows_only", "none"): - if k not in preservation_bucket: - preservation_bucket[k] = 0 - if merge_shadows: - preservation_bucket["none"] += preservation_bucket["shadows_only"] - preservation_bucket["shadows_only"] = 0 - release_type_bucket = agg_to_dict(resp.aggregations.release_type) - stats = { - "ident": ident, - "issnl": issnl, - "total": _hits_total_int(resp.hits.total), - "in_web": container_stats["in_web"]["doc_count"], - "in_kbart": container_stats["in_kbart"]["doc_count"], - "is_preserved": container_stats["is_preserved"]["doc_count"], - "preservation": preservation_bucket, - "release_type": release_type_bucket, - } - + stats["issnl"] = issnl return stats -- cgit v1.2.3