aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-02-09 18:03:23 -0800
committerBryan Newbold <bnewbold@robocracy.org>2022-02-09 18:06:55 -0800
commitef7d24cd08b98d58779335acd1d655bd342cd9ec (patch)
tree8f1e3db6187fd92a359d9c99c54e867f5412c46b /python
parentd73ab1f7cc45c122f321f0e717de2067554baabb (diff)
downloadfatcat-ef7d24cd08b98d58779335acd1d655bd342cd9ec.tar.gz
fatcat-ef7d24cd08b98d58779335acd1d655bd342cd9ec.zip
move container_status ES query code from fatcat_web to fatcat_tools
The main motivation is to never have fatcat_tools import from fatcat_web, only vica-versa. Some code in fatcat_tools needs container stats, so starting with that code path (plus some generic helpers).
Diffstat (limited to 'python')
-rw-r--r--python/fatcat_tools/search/common.py96
-rw-r--r--python/fatcat_tools/search/stats.py87
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py4
-rwxr-xr-xpython/fatcat_transform.py4
-rw-r--r--python/fatcat_web/routes.py2
-rw-r--r--python/fatcat_web/search.py169
6 files changed, 205 insertions, 157 deletions
diff --git a/python/fatcat_tools/search/common.py b/python/fatcat_tools/search/common.py
new file mode 100644
index 00000000..584757fd
--- /dev/null
+++ b/python/fatcat_tools/search/common.py
@@ -0,0 +1,96 @@
+import sys
+from typing import Any, Dict, List, Union
+
+import elasticsearch
+import elasticsearch_dsl.response
+from elasticsearch_dsl import Search
+
+
+class FatcatSearchError(Exception):
+ def __init__(self, status_code: Union[int, str], name: str, description: str = None):
+ if status_code == "TIMEOUT":
+ status_code = 504
+ elif isinstance(status_code, str):
+ try:
+ status_code = int(status_code)
+ except ValueError:
+ status_code = 503
+ self.status_code = status_code
+ self.name = name
+ self.description = description
+
+
+def _hits_total_int(val: Any) -> int:
+ """
+ Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as
+ an int in many places, in ES 7 as a dict (JSON object) with 'value' key
+ """
+ if isinstance(val, int):
+ return val
+ else:
+ return int(val["value"])
+
+
+def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]:
+ """
+ Takes a response returns all the hits as JSON objects.
+
+ Also handles surrogate strings that elasticsearch returns sometimes,
+ probably due to mangled data processing in some pipeline. "Crimes against
+ Unicode"; production workaround
+ """
+
+ results = []
+ for h in response:
+ r = h._d_
+ # print(h.meta._d_)
+ results.append(r)
+
+ for h in results:
+ for key in h:
+ if type(h[key]) is str:
+ h[key] = h[key].encode("utf8", "ignore").decode("utf8")
+ return results
+
+
+def wrap_es_execution(search: Search) -> Any:
+ """
+ Executes a Search object, and converts various ES error types into
+ something we can pretty print to the user.
+ """
+ try:
+ resp = search.execute()
+ except elasticsearch.exceptions.RequestError as e:
+ # this is a "user" error
+ print("elasticsearch 400: " + str(e.info), file=sys.stderr)
+ description = None
+ assert isinstance(e.info, dict)
+ if e.info.get("error", {}).get("root_cause", {}):
+ description = str(e.info["error"]["root_cause"][0].get("reason"))
+ raise FatcatSearchError(e.status_code, str(e.error), description)
+ except elasticsearch.exceptions.ConnectionError as e:
+ raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available")
+ except elasticsearch.exceptions.TransportError as e:
+ # all other errors
+ print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr)
+ description = None
+ assert isinstance(e.info, dict)
+ if e.info and e.info.get("error", {}).get("root_cause", {}):
+ description = str(e.info["error"]["root_cause"][0].get("reason"))
+ raise FatcatSearchError(e.status_code, str(e.error), description)
+ return resp
+
+
+def agg_to_dict(agg: Any) -> Dict[str, Any]:
+ """
+ Takes a simple term aggregation result (with buckets) and returns a simple
+ dict with keys as terms and counts as values. Includes an extra value
+ '_other', and by convention aggregations should be written to have "missing"
+ values as '_unknown'.
+ """
+ result = dict()
+ for bucket in agg.buckets:
+ result[bucket.key] = bucket.doc_count
+ if agg.sum_other_doc_count:
+ result["_other"] = agg.sum_other_doc_count
+ return result
diff --git a/python/fatcat_tools/search/stats.py b/python/fatcat_tools/search/stats.py
new file mode 100644
index 00000000..5496b94a
--- /dev/null
+++ b/python/fatcat_tools/search/stats.py
@@ -0,0 +1,87 @@
+from typing import Any, Dict
+
+import elasticsearch
+from elasticsearch_dsl import Search
+
+from fatcat_tools.search.common import _hits_total_int, agg_to_dict, wrap_es_execution
+
+
+def query_es_container_stats(
+ ident: str,
+ es_client: elasticsearch.Elasticsearch,
+ es_index: str = "fatcat_release",
+ merge_shadows: bool = False,
+) -> Dict[str, Any]:
+ """
+ Returns dict:
+ ident
+ total: count
+ in_web: count
+ in_kbart: count
+ is_preserved: count
+ preservation{}
+ "histogram" by preservation status
+ release_type{}
+ "histogram" by release type
+ """
+
+ search = Search(using=es_client, index=es_index)
+ search = search.query(
+ "term",
+ container_id=ident,
+ )
+ search.aggs.bucket(
+ "container_stats",
+ "filters",
+ filters={
+ "in_web": {
+ "term": {"in_web": True},
+ },
+ "in_kbart": {
+ "term": {"in_kbart": True},
+ },
+ "is_preserved": {
+ "term": {"is_preserved": True},
+ },
+ },
+ )
+ search.aggs.bucket(
+ "preservation",
+ "terms",
+ field="preservation",
+ missing="_unknown",
+ )
+ search.aggs.bucket(
+ "release_type",
+ "terms",
+ field="release_type",
+ missing="_unknown",
+ )
+
+ search = search[:0]
+
+ search = search.params(request_cache=True)
+ search = search.params(track_total_hits=True)
+ resp = wrap_es_execution(search)
+
+ container_stats = resp.aggregations.container_stats.buckets
+ preservation_bucket = agg_to_dict(resp.aggregations.preservation)
+ preservation_bucket["total"] = _hits_total_int(resp.hits.total)
+ for k in ("bright", "dark", "shadows_only", "none"):
+ if k not in preservation_bucket:
+ preservation_bucket[k] = 0
+ if merge_shadows:
+ preservation_bucket["none"] += preservation_bucket["shadows_only"]
+ preservation_bucket["shadows_only"] = 0
+ release_type_bucket = agg_to_dict(resp.aggregations.release_type)
+ stats = {
+ "ident": ident,
+ "total": _hits_total_int(resp.hits.total),
+ "in_web": container_stats["in_web"]["doc_count"],
+ "in_kbart": container_stats["in_kbart"]["doc_count"],
+ "is_preserved": container_stats["is_preserved"]["doc_count"],
+ "preservation": preservation_bucket,
+ "release_type": release_type_bucket,
+ }
+
+ return stats
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index bfadea64..79071810 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -14,13 +14,13 @@ from fatcat_openapi_client import (
)
from fatcat_tools import entity_from_json, public_api
+from fatcat_tools.search.stats import query_es_container_stats
from fatcat_tools.transforms import (
changelog_to_elasticsearch,
container_to_elasticsearch,
file_to_elasticsearch,
release_to_elasticsearch,
)
-from fatcat_web.search import get_elastic_container_stats
from .worker_common import FatcatWorker
@@ -156,7 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
continue
if self.entity_type == ContainerEntity and self.query_stats:
- stats = get_elastic_container_stats(
+ stats = query_es_container_stats(
entity.ident,
es_client=es_client,
es_index=self.elasticsearch_release_index,
diff --git a/python/fatcat_transform.py b/python/fatcat_transform.py
index 67bf56c5..4f28951c 100755
--- a/python/fatcat_transform.py
+++ b/python/fatcat_transform.py
@@ -13,6 +13,7 @@ import elasticsearch
from fatcat_openapi_client import ChangelogEntry, ContainerEntity, FileEntity, ReleaseEntity
from fatcat_tools import public_api
+from fatcat_tools.search.stats import query_es_container_stats
from fatcat_tools.transforms import (
changelog_to_elasticsearch,
citeproc_csl,
@@ -22,7 +23,6 @@ from fatcat_tools.transforms import (
release_to_csl,
release_to_elasticsearch,
)
-from fatcat_web.search import get_elastic_container_stats
def run_elasticsearch_releases(args: argparse.Namespace) -> None:
@@ -50,7 +50,7 @@ def run_elasticsearch_containers(args: argparse.Namespace) -> None:
if args.query_stats:
es_doc = container_to_elasticsearch(
entity,
- stats=get_elastic_container_stats(
+ stats=query_es_container_stats(
entity.ident,
es_client=es_client,
es_index=es_release_index,
diff --git a/python/fatcat_web/routes.py b/python/fatcat_web/routes.py
index 9f46c674..0afc189f 100644
--- a/python/fatcat_web/routes.py
+++ b/python/fatcat_web/routes.py
@@ -30,6 +30,7 @@ from fatcat_tools.normal import (
clean_sha1,
clean_sha256,
)
+from fatcat_tools.search.common import FatcatSearchError
from fatcat_tools.transforms import citeproc_csl, release_to_csl
from fatcat_web import AnyResponse, Config, api, app, auth_api, mwoauth, priv_api
from fatcat_web.auth import (
@@ -55,7 +56,6 @@ from fatcat_web.graphics import (
)
from fatcat_web.kafka import kafka_pixy_produce
from fatcat_web.search import (
- FatcatSearchError,
GenericQuery,
ReleaseQuery,
do_container_search,
diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py
index ac4dc34e..99c8ee77 100644
--- a/python/fatcat_web/search.py
+++ b/python/fatcat_web/search.py
@@ -4,31 +4,22 @@ the formal API)
"""
import datetime
-import sys
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Tuple
import elasticsearch
-import elasticsearch_dsl.response
from elasticsearch_dsl import Q, Search
+from fatcat_tools.search.common import (
+ _hits_total_int,
+ agg_to_dict,
+ results_to_dict,
+ wrap_es_execution,
+)
+from fatcat_tools.search.stats import query_es_container_stats
from fatcat_web import app
-class FatcatSearchError(Exception):
- def __init__(self, status_code: Union[int, str], name: str, description: str = None):
- if status_code == "TIMEOUT":
- status_code = 504
- elif isinstance(status_code, str):
- try:
- status_code = int(status_code)
- except ValueError:
- status_code = 503
- self.status_code = status_code
- self.name = name
- self.description = description
-
-
@dataclass
class ReleaseQuery:
q: Optional[str] = None
@@ -88,82 +79,6 @@ class SearchHits:
results: List[Any]
-def _hits_total_int(val: Any) -> int:
- """
- Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as
- an int in many places, in ES 7 as a dict (JSON object) with 'value' key
- """
- if isinstance(val, int):
- return val
- else:
- return int(val["value"])
-
-
-def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]:
- """
- Takes a response returns all the hits as JSON objects.
-
- Also handles surrogate strings that elasticsearch returns sometimes,
- probably due to mangled data processing in some pipeline. "Crimes against
- Unicode"; production workaround
- """
-
- results = []
- for h in response:
- r = h._d_
- # print(h.meta._d_)
- results.append(r)
-
- for h in results:
- for key in h:
- if type(h[key]) is str:
- h[key] = h[key].encode("utf8", "ignore").decode("utf8")
- return results
-
-
-def wrap_es_execution(search: Search) -> Any:
- """
- Executes a Search object, and converts various ES error types into
- something we can pretty print to the user.
- """
- try:
- resp = search.execute()
- except elasticsearch.exceptions.RequestError as e:
- # this is a "user" error
- print("elasticsearch 400: " + str(e.info), file=sys.stderr)
- description = None
- assert isinstance(e.info, dict)
- if e.info.get("error", {}).get("root_cause", {}):
- description = str(e.info["error"]["root_cause"][0].get("reason"))
- raise FatcatSearchError(e.status_code, str(e.error), description)
- except elasticsearch.exceptions.ConnectionError as e:
- raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available")
- except elasticsearch.exceptions.TransportError as e:
- # all other errors
- print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr)
- description = None
- assert isinstance(e.info, dict)
- if e.info and e.info.get("error", {}).get("root_cause", {}):
- description = str(e.info["error"]["root_cause"][0].get("reason"))
- raise FatcatSearchError(e.status_code, str(e.error), description)
- return resp
-
-
-def agg_to_dict(agg: Any) -> Dict[str, Any]:
- """
- Takes a simple term aggregation result (with buckets) and returns a simple
- dict with keys as terms and counts as values. Includes an extra value
- '_other', and by convention aggregations should be written to have "missing"
- values as '_unknown'.
- """
- result = dict()
- for bucket in agg.buckets:
- result[bucket.key] = bucket.doc_count
- if agg.sum_other_doc_count:
- result["_other"] = agg.sum_other_doc_count
- return result
-
-
def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> SearchHits:
search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_CONTAINER_INDEX"])
@@ -536,6 +451,9 @@ def get_elastic_container_stats(
merge_shadows: Optional[bool] = None,
) -> Dict[str, Any]:
"""
+ This is a DEPRECATED backwards-compatability wrapper around the new
+ query_es_container_stats() method from fatcat_tools.
+
Returns dict:
ident
issnl (optional)
@@ -556,66 +474,13 @@ def get_elastic_container_stats(
if merge_shadows is None:
merge_shadows = app.config["FATCAT_MERGE_SHADOW_PRESERVATION"]
- search = Search(using=es_client, index=es_index)
- search = search.query(
- "term",
- container_id=ident,
- )
- search.aggs.bucket(
- "container_stats",
- "filters",
- filters={
- "in_web": {
- "term": {"in_web": True},
- },
- "in_kbart": {
- "term": {"in_kbart": True},
- },
- "is_preserved": {
- "term": {"is_preserved": True},
- },
- },
- )
- search.aggs.bucket(
- "preservation",
- "terms",
- field="preservation",
- missing="_unknown",
+ stats = query_es_container_stats(
+ ident=ident,
+ es_client=es_client,
+ es_index=es_index,
+ merge_shadows=merge_shadows,
)
- search.aggs.bucket(
- "release_type",
- "terms",
- field="release_type",
- missing="_unknown",
- )
-
- search = search[:0]
-
- search = search.params(request_cache=True)
- search = search.params(track_total_hits=True)
- resp = wrap_es_execution(search)
-
- container_stats = resp.aggregations.container_stats.buckets
- preservation_bucket = agg_to_dict(resp.aggregations.preservation)
- preservation_bucket["total"] = _hits_total_int(resp.hits.total)
- for k in ("bright", "dark", "shadows_only", "none"):
- if k not in preservation_bucket:
- preservation_bucket[k] = 0
- if merge_shadows:
- preservation_bucket["none"] += preservation_bucket["shadows_only"]
- preservation_bucket["shadows_only"] = 0
- release_type_bucket = agg_to_dict(resp.aggregations.release_type)
- stats = {
- "ident": ident,
- "issnl": issnl,
- "total": _hits_total_int(resp.hits.total),
- "in_web": container_stats["in_web"]["doc_count"],
- "in_kbart": container_stats["in_kbart"]["doc_count"],
- "is_preserved": container_stats["is_preserved"]["doc_count"],
- "preservation": preservation_bucket,
- "release_type": release_type_bucket,
- }
-
+ stats["issnl"] = issnl
return stats