aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/search/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/search/common.py')
-rw-r--r--python/fatcat_tools/search/common.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/python/fatcat_tools/search/common.py b/python/fatcat_tools/search/common.py
new file mode 100644
index 00000000..584757fd
--- /dev/null
+++ b/python/fatcat_tools/search/common.py
@@ -0,0 +1,96 @@
+import sys
+from typing import Any, Dict, List, Union
+
+import elasticsearch
+import elasticsearch_dsl.response
+from elasticsearch_dsl import Search
+
+
+class FatcatSearchError(Exception):
+ def __init__(self, status_code: Union[int, str], name: str, description: str = None):
+ if status_code == "TIMEOUT":
+ status_code = 504
+ elif isinstance(status_code, str):
+ try:
+ status_code = int(status_code)
+ except ValueError:
+ status_code = 503
+ self.status_code = status_code
+ self.name = name
+ self.description = description
+
+
+def _hits_total_int(val: Any) -> int:
+ """
+ Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as
+ an int in many places, in ES 7 as a dict (JSON object) with 'value' key
+ """
+ if isinstance(val, int):
+ return val
+ else:
+ return int(val["value"])
+
+
+def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]:
+ """
+ Takes a response returns all the hits as JSON objects.
+
+ Also handles surrogate strings that elasticsearch returns sometimes,
+ probably due to mangled data processing in some pipeline. "Crimes against
+ Unicode"; production workaround
+ """
+
+ results = []
+ for h in response:
+ r = h._d_
+ # print(h.meta._d_)
+ results.append(r)
+
+ for h in results:
+ for key in h:
+ if type(h[key]) is str:
+ h[key] = h[key].encode("utf8", "ignore").decode("utf8")
+ return results
+
+
+def wrap_es_execution(search: Search) -> Any:
+ """
+ Executes a Search object, and converts various ES error types into
+ something we can pretty print to the user.
+ """
+ try:
+ resp = search.execute()
+ except elasticsearch.exceptions.RequestError as e:
+ # this is a "user" error
+ print("elasticsearch 400: " + str(e.info), file=sys.stderr)
+ description = None
+ assert isinstance(e.info, dict)
+ if e.info.get("error", {}).get("root_cause", {}):
+ description = str(e.info["error"]["root_cause"][0].get("reason"))
+ raise FatcatSearchError(e.status_code, str(e.error), description)
+ except elasticsearch.exceptions.ConnectionError as e:
+ raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available")
+ except elasticsearch.exceptions.TransportError as e:
+ # all other errors
+ print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr)
+ description = None
+ assert isinstance(e.info, dict)
+ if e.info and e.info.get("error", {}).get("root_cause", {}):
+ description = str(e.info["error"]["root_cause"][0].get("reason"))
+ raise FatcatSearchError(e.status_code, str(e.error), description)
+ return resp
+
+
+def agg_to_dict(agg: Any) -> Dict[str, Any]:
+ """
+ Takes a simple term aggregation result (with buckets) and returns a simple
+ dict with keys as terms and counts as values. Includes an extra value
+ '_other', and by convention aggregations should be written to have "missing"
+ values as '_unknown'.
+ """
+ result = dict()
+ for bucket in agg.buckets:
+ result[bucket.key] = bucket.doc_count
+ if agg.sum_other_doc_count:
+ result["_other"] = agg.sum_other_doc_count
+ return result