aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_web/search.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_web/search.py')
-rw-r--r--python/fatcat_web/search.py343
1 files changed, 161 insertions, 182 deletions
diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py
index 8cbe09f6..913d6696 100644
--- a/python/fatcat_web/search.py
+++ b/python/fatcat_web/search.py
@@ -4,31 +4,22 @@ the formal API)
"""
import datetime
-import sys
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Tuple
import elasticsearch
-import elasticsearch_dsl.response
from elasticsearch_dsl import Q, Search
+from fatcat_tools.search.common import (
+ _hits_total_int,
+ agg_to_dict,
+ results_to_dict,
+ wrap_es_execution,
+)
+from fatcat_tools.search.stats import query_es_container_stats
from fatcat_web import app
-class FatcatSearchError(Exception):
- def __init__(self, status_code: Union[int, str], name: str, description: str = None):
- if status_code == "TIMEOUT":
- status_code = 504
- elif isinstance(status_code, str):
- try:
- status_code = int(status_code)
- except ValueError:
- status_code = 503
- self.status_code = status_code
- self.name = name
- self.description = description
-
-
@dataclass
class ReleaseQuery:
q: Optional[str] = None
@@ -38,22 +29,13 @@ class ReleaseQuery:
container_id: Optional[str] = None
recent: bool = False
exclude_stubs: bool = False
+ sort: Optional[List[str]] = None
@staticmethod
def from_args(args: Dict[str, Any]) -> "ReleaseQuery":
query_str = args.get("q") or "*"
- container_id = args.get("container_id")
- # TODO: as filter, not in query string
- if container_id:
- query_str += ' container_id:"{}"'.format(container_id)
-
- # TODO: where are container_issnl queries actually used?
- issnl = args.get("container_issnl")
- if issnl and query_str:
- query_str += ' container_issnl:"{}"'.format(issnl)
-
offset = args.get("offset", "0")
offset = max(0, int(offset)) if offset.isnumeric() else 0
@@ -61,9 +43,10 @@ class ReleaseQuery:
q=query_str,
offset=offset,
fulltext_only=bool(args.get("fulltext_only")),
- container_id=container_id,
+ container_id=args.get("container_id"),
recent=bool(args.get("recent")),
exclude_stubs=bool(args.get("exclude_stubs")),
+ sort=None,
)
@@ -98,87 +81,11 @@ class SearchHits:
results: List[Any]
-def _hits_total_int(val: Any) -> int:
- """
- Compatibility hack between ES 6.x and 7.x. In ES 6x, total is returned as
- an int in many places, in ES 7 as a dict (JSON object) with 'value' key
- """
- if isinstance(val, int):
- return val
- else:
- return int(val["value"])
-
-
-def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]:
- """
- Takes a response returns all the hits as JSON objects.
-
- Also handles surrogate strings that elasticsearch returns sometimes,
- probably due to mangled data processing in some pipeline. "Crimes against
- Unicode"; production workaround
- """
-
- results = []
- for h in response:
- r = h._d_
- # print(h.meta._d_)
- results.append(r)
-
- for h in results:
- for key in h:
- if type(h[key]) is str:
- h[key] = h[key].encode("utf8", "ignore").decode("utf8")
- return results
-
-
-def wrap_es_execution(search: Search) -> Any:
- """
- Executes a Search object, and converts various ES error types into
- something we can pretty print to the user.
- """
- try:
- resp = search.execute()
- except elasticsearch.exceptions.RequestError as e:
- # this is a "user" error
- print("elasticsearch 400: " + str(e.info), file=sys.stderr)
- description = None
- assert isinstance(e.info, dict)
- if e.info.get("error", {}).get("root_cause", {}):
- description = str(e.info["error"]["root_cause"][0].get("reason"))
- raise FatcatSearchError(e.status_code, str(e.error), description)
- except elasticsearch.exceptions.ConnectionError as e:
- raise FatcatSearchError(e.status_code, "ConnectionError: search engine not available")
- except elasticsearch.exceptions.TransportError as e:
- # all other errors
- print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr)
- description = None
- assert isinstance(e.info, dict)
- if e.info and e.info.get("error", {}).get("root_cause", {}):
- description = str(e.info["error"]["root_cause"][0].get("reason"))
- raise FatcatSearchError(e.status_code, str(e.error), description)
- return resp
-
-
-def agg_to_dict(agg: Any) -> Dict[str, Any]:
- """
- Takes a simple term aggregation result (with buckets) and returns a simple
- dict with keys as terms and counts as values. Includes an extra value
- '_other', and by convention aggregations should be written to have "missing"
- values as '_unknown'.
- """
- result = dict()
- for bucket in agg.buckets:
- result[bucket.key] = bucket.doc_count
- if agg.sum_other_doc_count:
- result["_other"] = agg.sum_other_doc_count
- return result
-
-
def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> SearchHits:
search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_CONTAINER_INDEX"])
- search = search.query(
+ basic_query = Q(
"query_string",
query=query.q,
default_operator="AND",
@@ -188,8 +95,22 @@ def do_container_search(query: GenericQuery, deep_page_limit: int = 2000) -> Sea
fields=["biblio"],
)
+ search = search.query(
+ "boosting",
+ positive=Q(
+ "bool",
+ must=basic_query,
+ should=[
+ Q("range", releases_total={"gte": 500}),
+ Q("range", releases_total={"gte": 5000}),
+ ],
+ ),
+ negative=Q("term", releases_total=0),
+ negative_boost=0.5,
+ )
+
# Sanity checks
- limit = min((int(query.limit or 25), 100))
+ limit = min((int(query.limit or 25), 300))
offset = max((int(query.offset or 0), 0))
if offset > deep_page_limit:
# Avoid deep paging problem.
@@ -249,6 +170,9 @@ def do_release_search(query: ReleaseQuery, deep_page_limit: int = 2000) -> Searc
],
)
+ if query.container_id:
+ search = search.filter("term", container_id=query.container_id)
+
search = search.query(
"boosting",
positive=Q(
@@ -260,8 +184,11 @@ def do_release_search(query: ReleaseQuery, deep_page_limit: int = 2000) -> Searc
negative_boost=0.5,
)
+ if query.sort:
+ search = search.sort(*query.sort)
+
# Sanity checks
- limit = min((int(query.limit or 25), 100))
+ limit = min((int(query.limit or 25), 300))
offset = max((int(query.offset or 0), 0))
if offset > deep_page_limit:
# Avoid deep paging problem.
@@ -320,6 +247,122 @@ def get_elastic_container_random_releases(ident: str, limit: int = 5) -> List[Di
return results
+def _sort_vol_key(val: Optional[Any]) -> Tuple[bool, bool, int, str]:
+ """
+ Helper for sorting volume and issue strings. Defined order is:
+
+ - None values first
+ - any non-integers next, in non-integer order
+ - any integers next, in integer sorted order (ascending)
+
+ Note that the actual sort used/displayed is reversed.
+
+ TODO: 'val' should actually be Optional[str], but getting a mypy error I
+ don't know how to hack around quickly right now.
+ """
+ if val is None:
+ return (False, False, 0, "")
+ if val.isdigit():
+ return (True, True, int(val), "")
+ else:
+ return (True, False, 0, val)
+
+
+def get_elastic_container_browse_year_volume_issue(ident: str) -> List[Dict[str, Any]]:
+ """
+ Returns a set of histogram buckets, as nested dicts/lists:
+
+ [
+ { year: int,
+ volumes: [
+ { volume: str|None
+ issues: [
+ { issue: str|None
+ count: int
+ }
+ ] }
+ ] }
+ ]
+ """
+
+ search = Search(using=app.es_client, index=app.config["ELASTICSEARCH_RELEASE_INDEX"])
+ search = search.query(
+ "bool",
+ filter=[Q("bool", must_not=[Q("match", release_type="stub")])],
+ )
+ search = search.filter("term", container_id=ident)
+ search.aggs.bucket(
+ "year_volume",
+ "composite",
+ size=1500,
+ sources=[
+ {
+ "year": {
+ "histogram": {
+ "field": "release_year",
+ "interval": 1,
+ "missing_bucket": True,
+ },
+ }
+ },
+ {
+ "volume": {
+ "terms": {
+ "field": "volume",
+ "missing_bucket": True,
+ },
+ }
+ },
+ {
+ "issue": {
+ "terms": {
+ "field": "issue",
+ "missing_bucket": True,
+ },
+ }
+ },
+ ],
+ )
+ search = search[:0]
+ search = search.params(request_cache=True)
+ resp = wrap_es_execution(search)
+ buckets = resp.aggregations.year_volume.buckets
+ # print(buckets)
+ buckets = [h for h in buckets if h["key"]["year"]]
+ year_nums = set([int(h["key"]["year"]) for h in buckets])
+ year_dicts: Dict[int, Dict[str, Any]] = dict()
+ if year_nums:
+ for year in year_nums:
+ year_dicts[year] = {}
+ for row in buckets:
+ year = int(row["key"]["year"])
+ volume = row["key"]["volume"] or ""
+ issue = row["key"]["issue"] or ""
+ if volume not in year_dicts[year]:
+ year_dicts[year][volume] = {}
+ year_dicts[year][volume][issue] = int(row["doc_count"])
+
+ # transform to lists-of-dicts
+ year_list = []
+ for year in year_dicts.keys():
+ volume_list = []
+ for volume in year_dicts[year].keys():
+ issue_list = []
+ for issue in year_dicts[year][volume].keys():
+ issue_list.append(
+ dict(issue=issue or None, count=year_dicts[year][volume][issue])
+ )
+ issue_list = sorted(
+ issue_list, key=lambda x: _sort_vol_key(x["issue"]), reverse=True
+ )
+ volume_list.append(dict(volume=volume or None, issues=issue_list))
+ volume_list = sorted(
+ volume_list, key=lambda x: _sort_vol_key(x["volume"]), reverse=True
+ )
+ year_list.append(dict(year=year, volumes=volume_list))
+ return sorted(year_list, key=lambda x: x["year"], reverse=True)
+
+
def get_elastic_entity_stats() -> dict:
"""
TODO: files, filesets, webcaptures (no schema yet)
@@ -465,6 +508,9 @@ def get_elastic_container_stats(
merge_shadows: Optional[bool] = None,
) -> Dict[str, Any]:
"""
+ This is a DEPRECATED backwards-compatability wrapper around the new
+ query_es_container_stats() method from fatcat_tools.
+
Returns dict:
ident
issnl (optional)
@@ -485,66 +531,13 @@ def get_elastic_container_stats(
if merge_shadows is None:
merge_shadows = app.config["FATCAT_MERGE_SHADOW_PRESERVATION"]
- search = Search(using=es_client, index=es_index)
- search = search.query(
- "term",
- container_id=ident,
- )
- search.aggs.bucket(
- "container_stats",
- "filters",
- filters={
- "in_web": {
- "term": {"in_web": True},
- },
- "in_kbart": {
- "term": {"in_kbart": True},
- },
- "is_preserved": {
- "term": {"is_preserved": True},
- },
- },
+ stats = query_es_container_stats(
+ ident=ident,
+ es_client=es_client,
+ es_index=es_index,
+ merge_shadows=merge_shadows,
)
- search.aggs.bucket(
- "preservation",
- "terms",
- field="preservation",
- missing="_unknown",
- )
- search.aggs.bucket(
- "release_type",
- "terms",
- field="release_type",
- missing="_unknown",
- )
-
- search = search[:0]
-
- search = search.params(request_cache=True)
- search = search.params(track_total_hits=True)
- resp = wrap_es_execution(search)
-
- container_stats = resp.aggregations.container_stats.buckets
- preservation_bucket = agg_to_dict(resp.aggregations.preservation)
- preservation_bucket["total"] = _hits_total_int(resp.hits.total)
- for k in ("bright", "dark", "shadows_only", "none"):
- if k not in preservation_bucket:
- preservation_bucket[k] = 0
- if merge_shadows:
- preservation_bucket["none"] += preservation_bucket["shadows_only"]
- preservation_bucket["shadows_only"] = 0
- release_type_bucket = agg_to_dict(resp.aggregations.release_type)
- stats = {
- "ident": ident,
- "issnl": issnl,
- "total": _hits_total_int(resp.hits.total),
- "in_web": container_stats["in_web"]["doc_count"],
- "in_kbart": container_stats["in_kbart"]["doc_count"],
- "is_preserved": container_stats["is_preserved"]["doc_count"],
- "preservation": preservation_bucket,
- "release_type": release_type_bucket,
- }
-
+ stats["issnl"] = issnl
return stats
@@ -643,11 +636,7 @@ def get_elastic_preservation_by_year(query: ReleaseQuery) -> List[Dict[str, Any]
"biblio",
],
)
- if query.container_id:
- search = search.filter(
- "term",
- container_id=query.container_id,
- )
+ search = search.filter("term", container_id=query.container_id)
if query.exclude_stubs:
search = search.query(
"bool",
@@ -909,17 +898,7 @@ def get_elastic_preservation_by_type(query: ReleaseQuery) -> List[dict]:
],
)
if query.container_id:
- search = search.query(
- "bool",
- filter=[
- Q(
- "bool",
- must=[
- Q("match", container_id=query.container_id),
- ],
- ),
- ],
- )
+ search = search.filter("term", container_id=query.container_id)
if query.recent:
date_today = datetime.date.today()
start_date = str(date_today - datetime.timedelta(days=60))