aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-02-09 18:03:23 -0800
committerBryan Newbold <bnewbold@robocracy.org>2022-02-09 18:06:55 -0800
commitef7d24cd08b98d58779335acd1d655bd342cd9ec (patch)
tree8f1e3db6187fd92a359d9c99c54e867f5412c46b /python/fatcat_tools
parentd73ab1f7cc45c122f321f0e717de2067554baabb (diff)
downloadfatcat-ef7d24cd08b98d58779335acd1d655bd342cd9ec.tar.gz
fatcat-ef7d24cd08b98d58779335acd1d655bd342cd9ec.zip
move container_status ES query code from fatcat_web to fatcat_tools
The main motivation is to never have fatcat_tools import from fatcat_web, only vica-versa. Some code in fatcat_tools needs container stats, so starting with that code path (plus some generic helpers).
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r--python/fatcat_tools/search/common.py96
-rw-r--r--python/fatcat_tools/search/stats.py87
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py4
3 files changed, 185 insertions, 2 deletions
diff --git a/python/fatcat_tools/search/common.py b/python/fatcat_tools/search/common.py
new file mode 100644
index 00000000..584757fd
--- /dev/null
+++ b/python/fatcat_tools/search/common.py
@@ -0,0 +1,96 @@
+import sys
+from typing import Any, Dict, List, Union
+
+import elasticsearch
+import elasticsearch_dsl.response
+from elasticsearch_dsl import Search
+
+
+class FatcatSearchError(Exception):
+ def __init__(self, status_code: Union[int, str], name: str, description: str = None):
+ if status_code == "TIMEOUT":
+ status_code = 504
+ elif isinstance(status_code, str):
+ try:
+ status_code = int(status_code)
+ except ValueError:
+ status_code = 503
+ self.status_code = status_code
+ self.name = name
+ self.description = description
+
+
+def _hits_total_int(val: Any) -> int:
+ """
+ Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as
+ an int in many places, in ES 7 as a dict (JSON object) with 'value' key
+ """
+ if isinstance(val, int):
+ return val
+ else:
+ return int(val["value"])
+
+
+def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]:
+ """
+ Takes a response returns all the hits as JSON objects.
+
+ Also handles surrogate strings that elasticsearch returns sometimes,
+ probably due to mangled data processing in some pipeline. "Crimes against
+ Unicode"; production workaround
+ """
+
+ results = []
+ for h in response:
+ r = h._d_
+ # print(h.meta._d_)
+ results.append(r)
+
+ for h in results:
+ for key in h:
+ if type(h[key]) is str:
+ h[key] = h[key].encode("utf8", "ignore").decode("utf8")
+ return results
+
+
+def wrap_es_execution(search: Search) -> Any:
+ """
+ Executes a Search object, and converts various ES error types into
+ something we can pretty print to the user.
+ """
+ try:
+ resp = search.execute()
+ except elasticsearch.exceptions.RequestError as e:
+ # this is a "user" error
+ print("elasticsearch 400: " + str(e.info), file=sys.stderr)
+ description = None
+ assert isinstance(e.info, dict)
+ if e.info.get("error", {}).get("root_cause", {}):
+ description = str(e.info["error"]["root_cause"][0].get("reason"))
+ raise FatcatSearchError(e.status_code, str(e.error), description)
+ except elasticsearch.exceptions.ConnectionError as e:
+ raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available")
+ except elasticsearch.exceptions.TransportError as e:
+ # all other errors
+ print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr)
+ description = None
+ assert isinstance(e.info, dict)
+ if e.info and e.info.get("error", {}).get("root_cause", {}):
+ description = str(e.info["error"]["root_cause"][0].get("reason"))
+ raise FatcatSearchError(e.status_code, str(e.error), description)
+ return resp
+
+
+def agg_to_dict(agg: Any) -> Dict[str, Any]:
+ """
+ Takes a simple term aggregation result (with buckets) and returns a simple
+ dict with keys as terms and counts as values. Includes an extra value
+ '_other', and by convention aggregations should be written to have "missing"
+ values as '_unknown'.
+ """
+ result = dict()
+ for bucket in agg.buckets:
+ result[bucket.key] = bucket.doc_count
+ if agg.sum_other_doc_count:
+ result["_other"] = agg.sum_other_doc_count
+ return result
diff --git a/python/fatcat_tools/search/stats.py b/python/fatcat_tools/search/stats.py
new file mode 100644
index 00000000..5496b94a
--- /dev/null
+++ b/python/fatcat_tools/search/stats.py
@@ -0,0 +1,87 @@
+from typing import Any, Dict
+
+import elasticsearch
+from elasticsearch_dsl import Search
+
+from fatcat_tools.search.common import _hits_total_int, agg_to_dict, wrap_es_execution
+
+
+def query_es_container_stats(
+ ident: str,
+ es_client: elasticsearch.Elasticsearch,
+ es_index: str = "fatcat_release",
+ merge_shadows: bool = False,
+) -> Dict[str, Any]:
+ """
+ Returns dict:
+ ident
+ total: count
+ in_web: count
+ in_kbart: count
+ is_preserved: count
+ preservation{}
+ "histogram" by preservation status
+ release_type{}
+ "histogram" by release type
+ """
+
+ search = Search(using=es_client, index=es_index)
+ search = search.query(
+ "term",
+ container_id=ident,
+ )
+ search.aggs.bucket(
+ "container_stats",
+ "filters",
+ filters={
+ "in_web": {
+ "term": {"in_web": True},
+ },
+ "in_kbart": {
+ "term": {"in_kbart": True},
+ },
+ "is_preserved": {
+ "term": {"is_preserved": True},
+ },
+ },
+ )
+ search.aggs.bucket(
+ "preservation",
+ "terms",
+ field="preservation",
+ missing="_unknown",
+ )
+ search.aggs.bucket(
+ "release_type",
+ "terms",
+ field="release_type",
+ missing="_unknown",
+ )
+
+ search = search[:0]
+
+ search = search.params(request_cache=True)
+ search = search.params(track_total_hits=True)
+ resp = wrap_es_execution(search)
+
+ container_stats = resp.aggregations.container_stats.buckets
+ preservation_bucket = agg_to_dict(resp.aggregations.preservation)
+ preservation_bucket["total"] = _hits_total_int(resp.hits.total)
+ for k in ("bright", "dark", "shadows_only", "none"):
+ if k not in preservation_bucket:
+ preservation_bucket[k] = 0
+ if merge_shadows:
+ preservation_bucket["none"] += preservation_bucket["shadows_only"]
+ preservation_bucket["shadows_only"] = 0
+ release_type_bucket = agg_to_dict(resp.aggregations.release_type)
+ stats = {
+ "ident": ident,
+ "total": _hits_total_int(resp.hits.total),
+ "in_web": container_stats["in_web"]["doc_count"],
+ "in_kbart": container_stats["in_kbart"]["doc_count"],
+ "is_preserved": container_stats["is_preserved"]["doc_count"],
+ "preservation": preservation_bucket,
+ "release_type": release_type_bucket,
+ }
+
+ return stats
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index bfadea64..79071810 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -14,13 +14,13 @@ from fatcat_openapi_client import (
)
from fatcat_tools import entity_from_json, public_api
+from fatcat_tools.search.stats import query_es_container_stats
from fatcat_tools.transforms import (
changelog_to_elasticsearch,
container_to_elasticsearch,
file_to_elasticsearch,
release_to_elasticsearch,
)
-from fatcat_web.search import get_elastic_container_stats
from .worker_common import FatcatWorker
@@ -156,7 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
continue
if self.entity_type == ContainerEntity and self.query_stats:
- stats = get_elastic_container_stats(
+ stats = query_es_container_stats(
entity.ident,
es_client=es_client,
es_index=self.elasticsearch_release_index,