diff options
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/search/common.py | 96 | ||||
-rw-r--r-- | python/fatcat_tools/search/stats.py | 87 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 4 |
3 files changed, 185 insertions, 2 deletions
diff --git a/python/fatcat_tools/search/common.py b/python/fatcat_tools/search/common.py new file mode 100644 index 00000000..584757fd --- /dev/null +++ b/python/fatcat_tools/search/common.py @@ -0,0 +1,96 @@ +import sys +from typing import Any, Dict, List, Union + +import elasticsearch +import elasticsearch_dsl.response +from elasticsearch_dsl import Search + + +class FatcatSearchError(Exception): + def __init__(self, status_code: Union[int, str], name: str, description: str = None): + if status_code == "TIMEOUT": + status_code = 504 + elif isinstance(status_code, str): + try: + status_code = int(status_code) + except ValueError: + status_code = 503 + self.status_code = status_code + self.name = name + self.description = description + + +def _hits_total_int(val: Any) -> int: + """ + Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as + an int in many places, in ES 7 as a dict (JSON object) with 'value' key + """ + if isinstance(val, int): + return val + else: + return int(val["value"]) + + +def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]: + """ + Takes a response returns all the hits as JSON objects. + + Also handles surrogate strings that elasticsearch returns sometimes, + probably due to mangled data processing in some pipeline. "Crimes against + Unicode"; production workaround + """ + + results = [] + for h in response: + r = h._d_ + # print(h.meta._d_) + results.append(r) + + for h in results: + for key in h: + if type(h[key]) is str: + h[key] = h[key].encode("utf8", "ignore").decode("utf8") + return results + + +def wrap_es_execution(search: Search) -> Any: + """ + Executes a Search object, and converts various ES error types into + something we can pretty print to the user. + """ + try: + resp = search.execute() + except elasticsearch.exceptions.RequestError as e: + # this is a "user" error + print("elasticsearch 400: " + str(e.info), file=sys.stderr) + description = None + assert isinstance(e.info, dict) + if e.info.get("error", {}).get("root_cause", {}): + description = str(e.info["error"]["root_cause"][0].get("reason")) + raise FatcatSearchError(e.status_code, str(e.error), description) + except elasticsearch.exceptions.ConnectionError as e: + raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available") + except elasticsearch.exceptions.TransportError as e: + # all other errors + print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr) + description = None + assert isinstance(e.info, dict) + if e.info and e.info.get("error", {}).get("root_cause", {}): + description = str(e.info["error"]["root_cause"][0].get("reason")) + raise FatcatSearchError(e.status_code, str(e.error), description) + return resp + + +def agg_to_dict(agg: Any) -> Dict[str, Any]: + """ + Takes a simple term aggregation result (with buckets) and returns a simple + dict with keys as terms and counts as values. Includes an extra value + '_other', and by convention aggregations should be written to have "missing" + values as '_unknown'. + """ + result = dict() + for bucket in agg.buckets: + result[bucket.key] = bucket.doc_count + if agg.sum_other_doc_count: + result["_other"] = agg.sum_other_doc_count + return result diff --git a/python/fatcat_tools/search/stats.py b/python/fatcat_tools/search/stats.py new file mode 100644 index 00000000..5496b94a --- /dev/null +++ b/python/fatcat_tools/search/stats.py @@ -0,0 +1,87 @@ +from typing import Any, Dict + +import elasticsearch +from elasticsearch_dsl import Search + +from fatcat_tools.search.common import _hits_total_int, agg_to_dict, wrap_es_execution + + +def query_es_container_stats( + ident: str, + es_client: elasticsearch.Elasticsearch, + es_index: str = "fatcat_release", + merge_shadows: bool = False, +) -> Dict[str, Any]: + """ + Returns dict: + ident + total: count + in_web: count + in_kbart: count + is_preserved: count + preservation{} + "histogram" by preservation status + release_type{} + "histogram" by release type + """ + + search = Search(using=es_client, index=es_index) + search = search.query( + "term", + container_id=ident, + ) + search.aggs.bucket( + "container_stats", + "filters", + filters={ + "in_web": { + "term": {"in_web": True}, + }, + "in_kbart": { + "term": {"in_kbart": True}, + }, + "is_preserved": { + "term": {"is_preserved": True}, + }, + }, + ) + search.aggs.bucket( + "preservation", + "terms", + field="preservation", + missing="_unknown", + ) + search.aggs.bucket( + "release_type", + "terms", + field="release_type", + missing="_unknown", + ) + + search = search[:0] + + search = search.params(request_cache=True) + search = search.params(track_total_hits=True) + resp = wrap_es_execution(search) + + container_stats = resp.aggregations.container_stats.buckets + preservation_bucket = agg_to_dict(resp.aggregations.preservation) + preservation_bucket["total"] = _hits_total_int(resp.hits.total) + for k in ("bright", "dark", "shadows_only", "none"): + if k not in preservation_bucket: + preservation_bucket[k] = 0 + if merge_shadows: + preservation_bucket["none"] += preservation_bucket["shadows_only"] + preservation_bucket["shadows_only"] = 0 + release_type_bucket = agg_to_dict(resp.aggregations.release_type) + stats = { + "ident": ident, + "total": _hits_total_int(resp.hits.total), + "in_web": container_stats["in_web"]["doc_count"], + "in_kbart": container_stats["in_kbart"]["doc_count"], + "is_preserved": container_stats["is_preserved"]["doc_count"], + "preservation": preservation_bucket, + "release_type": release_type_bucket, + } + + return stats diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index bfadea64..79071810 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -14,13 +14,13 @@ from fatcat_openapi_client import ( ) from fatcat_tools import entity_from_json, public_api +from fatcat_tools.search.stats import query_es_container_stats from fatcat_tools.transforms import ( changelog_to_elasticsearch, container_to_elasticsearch, file_to_elasticsearch, release_to_elasticsearch, ) -from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -156,7 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): continue if self.entity_type == ContainerEntity and self.query_stats: - stats = get_elastic_container_stats( + stats = query_es_container_stats( entity.ident, es_client=es_client, es_index=self.elasticsearch_release_index, |