""" Helpers for doing elasticsearch queries (used in the web interface; not part of the formal API) """ import sys import datetime from dataclasses import dataclass from typing import List, Optional, Any import requests from flask import abort, flash import elasticsearch from elasticsearch_dsl import Search, Q import elasticsearch_dsl.response from fatcat_web import app @dataclass class ReleaseQuery: q: Optional[str] = None limit: Optional[int] = None offset: Optional[int] = None fulltext_only: bool = False container_id: Optional[str] = None @classmethod def from_args(cls, args) -> 'ReleaseQuery': query_str = args.get('q') or '*' container_id = args.get('container_id') # TODO: as filter, not in query string if container_id: query_str += ' container_id:"{}"'.format(container_id) # TODO: where are container_issnl queries actually used? issnl = args.get('container_issnl') if issnl and query_str: query_str += ' container_issnl:"{}"'.format(issnl) offset = args.get('offset', '0') offset = max(0, int(offset)) if offset.isnumeric() else 0 return ReleaseQuery( q=query_str, offset=offset, fulltext_only=bool(args.get('fulltext_only')), container_id=container_id, ) @dataclass class GenericQuery: q: Optional[str] = None limit: Optional[int] = None offset: Optional[int] = None @classmethod def from_args(cls, args) -> 'GenericQuery': query_str = args.get('q') if not query_str: query_str = '*' offset = args.get('offset', '0') offset = max(0, int(offset)) if offset.isnumeric() else 0 return GenericQuery( q=query_str, offset=offset, ) @dataclass class SearchHits: count_returned: int count_found: int offset: int limit: int deep_page_limit: int query_time_ms: int results: List[Any] def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]: """ Takes a response returns all the hits as JSON objects. Also handles surrogate strings that elasticsearch returns sometimes, probably due to mangled data processing in some pipeline. "Crimes against Unicode"; production workaround """ results = [] for h in response: r = h._d_ # print(h.meta._d_) results.append(r) for h in results: for key in h: if type(h[key]) is str: h[key] = h[key].encode("utf8", "ignore").decode("utf8") return results def wrap_es_execution(search: Search) -> Any: """ Executes a Search object, and converts various ES error types into something we can pretty print to the user. """ try: resp = search.execute() except elasticsearch.exceptions.RequestError as e: # this is a "user" error print("elasticsearch 400: " + str(e.info), file=sys.stderr) if e.info.get("error", {}).get("root_cause", {}): raise ValueError(str(e.info["error"]["root_cause"][0].get("reason"))) else: raise ValueError(str(e.info)) except elasticsearch.exceptions.TransportError as e: # all other errors print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr) raise IOError(str(e.info)) return resp def do_container_search( query: GenericQuery, deep_page_limit: int = 2000 ) -> SearchHits: search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_CONTAINER_INDEX']) search = search.query( "query_string", query=query.q, default_operator="AND", analyze_wildcard=True, allow_leading_wildcard=False, lenient=True, fields=["biblio"], ) # Sanity checks limit = min((int(query.limit or 25), 100)) offset = max((int(query.offset or 0), 0)) if offset > deep_page_limit: # Avoid deep paging problem. offset = deep_page_limit search = search[offset : (offset + limit)] resp = wrap_es_execution(search) results = results_to_dict(resp) return SearchHits( count_returned=len(results), count_found=int(resp.hits.total), offset=offset, limit=limit, deep_page_limit=deep_page_limit, query_time_ms=int(resp.took), results=results, ) def do_release_search( query: ReleaseQuery, deep_page_limit: int = 2000 ) -> SearchHits: search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) # availability filters if query.fulltext_only: search = search.filter("term", in_ia=True) # Below, we combine several queries to improve scoring. # this query use the fancy built-in query string parser basic_biblio = Q( "query_string", query=query.q, default_operator="AND", analyze_wildcard=True, allow_leading_wildcard=False, lenient=True, fields=[ "title^2", "biblio", ], ) has_fulltext = Q("term", in_ia=True) poor_metadata = Q( "bool", should=[ # if these fields aren't set, metadata is poor. The more that do # not exist, the stronger the signal. Q("bool", must_not=Q("exists", field="title")), Q("bool", must_not=Q("exists", field="release_year")), Q("bool", must_not=Q("exists", field="release_type")), Q("bool", must_not=Q("exists", field="release_stage")), ], ) search = search.query( "boosting", positive=Q("bool", must=basic_biblio, should=[has_fulltext],), negative=poor_metadata, negative_boost=0.5, ) # Sanity checks limit = min((int(query.limit or 25), 100)) offset = max((int(query.offset or 0), 0)) if offset > deep_page_limit: # Avoid deep paging problem. offset = deep_page_limit search = search[offset : (offset + limit)] resp = wrap_es_execution(search) results = results_to_dict(resp) for h in results: # Ensure 'contrib_names' is a list, not a single string print(h, file=sys.stderr) if type(h['contrib_names']) is not list: h['contrib_names'] = [h['contrib_names'], ] h['contrib_names'] = [name.encode('utf8', 'ignore').decode('utf8') for name in h['contrib_names']] return SearchHits( count_returned=len(results), count_found=int(resp.hits.total), offset=offset, limit=limit, deep_page_limit=deep_page_limit, query_time_ms=int(resp.took), results=results, ) def get_elastic_entity_stats(): """ TODO: files, filesets, webcaptures (no schema yet) Returns dict: changelog: {latest: {index, datetime}} release: {total, refs_total} papers: {total, in_web, in_oa, in_kbart, in_web_not_kbart} """ stats = {} # release totals search = Search( using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ .extra(request_cache=True) search.aggs.bucket( 'release_ref_count', 'sum', field='ref_count', ) search = search[:0] # pylint: disable=unsubscriptable-object # NOTE: not catching exceptions resp = search.execute() stats['release'] = { "total": int(resp.hits.total), "refs_total": int(resp.aggregations.release_ref_count.value), } # paper counts search = Search( using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ .query( 'terms', release_type=[ "article-journal", "paper-conference", # "chapter", # "thesis", ], ) \ .extra(request_cache=True) search.aggs.bucket( 'paper_like', 'filters', filters={ "in_web": { "term": { "in_web": "true" } }, "is_oa": { "term": { "is_oa": "true" } }, "in_kbart": { "term": { "in_kbart": "true" } }, "in_web_not_kbart": { "bool": { "filter": [ { "term": { "in_web": "true" } }, { "term": { "in_kbart": "false" } }, ]}}, } ) search = search[:0] # NOTE: not catching exceptions resp = search.execute() buckets = resp.aggregations.paper_like.buckets stats['papers'] = { 'total': resp.hits.total, 'in_web': buckets.in_web.doc_count, 'is_oa': buckets.is_oa.doc_count, 'in_kbart': buckets.in_kbart.doc_count, 'in_web_not_kbart': buckets.in_web_not_kbart.doc_count, } # container counts search = Search( using=app.es_client, index=app.config['ELASTICSEARCH_CONTAINER_INDEX']) \ .extra(request_cache=True) search.aggs.bucket( 'release_ref_count', 'sum', field='ref_count', ) search = search[:0] # pylint: disable=unsubscriptable-object # NOTE: not catching exceptions resp = search.execute() stats['container'] = { "total": resp.hits.total, } return stats def get_elastic_container_stats(ident, issnl=None): """ Returns dict: ident issnl (optional) total in_web in_kbart preserved """ query = { "size": 0, "query": { "term": { "container_id": ident } }, "aggs": { "container_stats": { "filters": { "filters": { "in_web": { "term": { "in_web": "true" } }, "in_kbart": { "term": { "in_kbart": "true" } }, "is_preserved": { "term": { "is_preserved": "true" } }, }}}} } resp = requests.get( "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), json=query, params=dict(request_cache="true")) # TODO: abort() #print(resp.json()) resp.raise_for_status() resp = resp.json() buckets = resp['aggregations']['container_stats']['buckets'] stats = { 'ident': ident, 'issnl': issnl, 'total': resp['hits']['total'], 'in_web': buckets['in_web']['doc_count'], 'in_kbart': buckets['in_kbart']['doc_count'], 'is_preserved': buckets['is_preserved']['doc_count'], } return stats def get_elastic_container_random_releases(ident, limit=5): """ Returns a list of releases from the container. """ assert limit > 0 and limit <= 100 search = Search(using=app.es_client, index=app.conf.ELASTICSEARCH_RELEASE_INDEX) \ .query('bool', must=[ Q('term', container_id=ident), Q('range', release_year={ "lte": datetime.datetime.today().year }), ] ) \ .sort('-in_web', '-release_date') \ .extra(request_cache=True) search = search[:int(limit)] resp = search.execute() hits = [dict(h.source) for h in resp] for h in hits: # Handle surrogate strings that elasticsearch returns sometimes, # probably due to mangled data processing in some pipeline. # "Crimes against Unicode"; production workaround for key in h: if type(h[key]) is str: h[key] = h[key].encode('utf8', 'ignore').decode('utf8') return hits def get_elastic_container_histogram(ident): """ Fetches a stacked histogram Filters to the past 500 years (at most), or about 1000 values. Returns a list of tuples: (year, in_ia, count) """ query = { "aggs": { "year_in_ia": { "composite": { "size": 1000, "sources": [ {"year": { "histogram": { "field": "release_year", "interval": 1, }}}, {"in_ia": { "terms": { "field": "in_ia", }}}, ], }, }, }, "size": 0, "query": { "bool": { "must": [{ "range": { "release_year": { "gte": datetime.datetime.today().year - 499, "lte": datetime.datetime.today().year, } } }], "filter": [{ "bool": { "should": [{ "match": { "container_id": ident } }], "minimum_should_match": 1, }, }], } } } resp = requests.get( "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), json=query, params=dict(request_cache="true")) resp.raise_for_status() # TODO: abort() resp = resp.json() #print(resp) vals = [(h['key']['year'], h['key']['in_ia'], h['doc_count']) for h in resp['aggregations']['year_in_ia']['buckets']] vals = sorted(vals) return vals