""" Helpers for doing elasticsearch queries (used in the web interface; not part of the formal API) TODO: ELASTICSEARCH_*_INDEX should probably be factored out and just hard-coded """ import datetime import requests from flask import abort, flash from fatcat_web import app import elasticsearch from elasticsearch_dsl import Search, Q def generic_search_execute(search, limit=30, offset=0, deep_page_limit=2000): # Sanity checks if limit > 100: limit = 100 if offset < 0: offset = 0 if offset > deep_page_limit: # Avoid deep paging problem. offset = deep_page_limit search = search[int(offset):int(offset)+int(limit)] try: resp = search.execute() except elasticsearch.exceptions.RequestError as e: # this is a "user" error print("elasticsearch 400: " + str(e.info)) flash("Search query failed to parse; you might need to use quotes.

{}: {}".format(e.error, e.info['error']['root_cause'][0]['reason'])) abort(e.status_code) except elasticsearch.exceptions.TransportError as e: # all other errors print("elasticsearch non-200 status code: {}".format(e.info)) flash("Elasticsearch error: {}".format(e.error)) abort(e.status_code) # just the dict() hits = [h._d_ for h in resp] for h in hits: # Handle surrogate strings that elasticsearch returns sometimes, # probably due to mangled data processing in some pipeline. # "Crimes against Unicode"; production workaround for key in h: if type(h[key]) is str: h[key] = h[key].encode('utf8', 'ignore').decode('utf8') return {"count_returned": len(hits), "count_found": int(resp.hits.total), "results": hits, "offset": offset, "limit": limit, "deep_page_limit": deep_page_limit} def do_release_search(q, limit=30, fulltext_only=True, offset=0): # Convert raw DOIs to DOI queries if len(q.split()) == 1 and q.startswith("10.") and q.count("/") >= 1: q = 'doi:"{}"'.format(q) if fulltext_only: q += " in_web:true" search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ .query( 'query_string', query=q, default_operator="AND", analyze_wildcard=True, lenient=True, fields=["biblio"], ) resp = generic_search_execute(search, offset=offset) for h in resp['results']: print(h) # Ensure 'contrib_names' is a list, not a single string if type(h['contrib_names']) is not list: h['contrib_names'] = [h['contrib_names'], ] h['contrib_names'] = [name.encode('utf8', 'ignore').decode('utf8') for name in h['contrib_names']] resp["query"] = { "q": q } return resp def do_container_search(q, limit=30, offset=0): # Convert raw ISSN-L to ISSN-L query if len(q.split()) == 1 and len(q) == 9 and q[0:4].isdigit() and q[4] == '-': q = 'issnl:"{}"'.format(q) search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ .query( 'query_string', query=q, default_operator="AND", analyze_wildcard=True, lenient=True, fields=["biblio"], ) resp = generic_search_execute(search, offset=offset) resp["query"] = { "q": q } return resp def get_elastic_entity_stats(): """ TODO: files, filesets, webcaptures (no schema yet) Returns dict: changelog: {latest: {index, datetime}} release: {total, refs_total} papers: {total, in_web, in_oa, in_kbart, in_web_not_kbart} """ stats = {} # release totals search = Search( using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ .extra(request_cache=True) search.aggs.bucket( 'release_ref_count', 'sum', field='ref_count', ) search = search[:0] # pylint: disable=unsubscriptable-object # NOTE: not catching exceptions resp = search.execute() stats['release'] = { "total": int(resp.hits.total), "refs_total": int(resp.aggregations.release_ref_count.value), } # paper counts search = Search( using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ .query( 'terms', release_type=[ "article-journal", "paper-conference", # "chapter", # "thesis", ], ) \ .extra(request_cache=True) search.aggs.bucket( 'paper_like', 'filters', filters={ "in_web": { "term": { "in_web": "true" } }, "is_oa": { "term": { "is_oa": "true" } }, "in_kbart": { "term": { "in_kbart": "true" } }, "in_web_not_kbart": { "bool": { "filter": [ { "term": { "in_web": "true" } }, { "term": { "in_kbart": "false" } }, ]}}, } ) search = search[:0] # NOTE: not catching exceptions resp = search.execute() buckets = resp.aggregations.paper_like.buckets stats['papers'] = { 'total': resp.hits.total, 'in_web': buckets.in_web.doc_count, 'is_oa': buckets.is_oa.doc_count, 'in_kbart': buckets.in_kbart.doc_count, 'in_web_not_kbart': buckets.in_web_not_kbart.doc_count, } # container counts search = Search( using=app.es_client, index=app.config['ELASTICSEARCH_CONTAINER_INDEX']) \ .extra(request_cache=True) search.aggs.bucket( 'release_ref_count', 'sum', field='ref_count', ) search = search[:0] # pylint: disable=unsubscriptable-object # NOTE: not catching exceptions resp = search.execute() stats['container'] = { "total": resp.hits.total, } return stats def get_elastic_container_stats(ident, issnl=None): """ Returns dict: ident issnl (optional) total in_web in_kbart preserved """ query = { "size": 0, "query": { "term": { "container_id": ident } }, "aggs": { "container_stats": { "filters": { "filters": { "in_web": { "term": { "in_web": "true" } }, "in_kbart": { "term": { "in_kbart": "true" } }, "is_preserved": { "term": { "is_preserved": "true" } }, }}}} } resp = requests.get( "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), json=query, params=dict(request_cache="true")) # TODO: abort() #print(resp.json()) resp.raise_for_status() resp = resp.json() buckets = resp['aggregations']['container_stats']['buckets'] stats = { 'ident': ident, 'issnl': issnl, 'total': resp['hits']['total'], 'in_web': buckets['in_web']['doc_count'], 'in_kbart': buckets['in_kbart']['doc_count'], 'is_preserved': buckets['is_preserved']['doc_count'], } return stats def get_elastic_container_random_releases(ident, limit=5): """ Returns a list of releases from the container. """ assert limit > 0 and limit <= 100 search = Search(using=app.es_client, index=app.conf.ELASTICSEARCH_RELEASE_INDEX) \ .query('bool', must=[ Q('term', container_id=ident), Q('range', release_year={ "lte": datetime.datetime.today().year }), ] ) \ .sort('-in_web', '-release_date') \ .extra(request_cache=True) search = search[:int(limit)] resp = search.execute() hits = [dict(h.source) for h in resp] for h in hits: # Handle surrogate strings that elasticsearch returns sometimes, # probably due to mangled data processing in some pipeline. # "Crimes against Unicode"; production workaround for key in h: if type(h[key]) is str: h[key] = h[key].encode('utf8', 'ignore').decode('utf8') return hits def get_elastic_container_histogram(ident): """ Fetches a stacked histogram Filters to the past 500 years (at most), or about 1000 values. Returns a list of tuples: (year, in_ia, count) """ query = { "aggs": { "year_in_ia": { "composite": { "size": 1000, "sources": [ {"year": { "histogram": { "field": "release_year", "interval": 1, }}}, {"in_ia": { "terms": { "field": "in_ia", }}}, ], }, }, }, "size": 0, "query": { "bool": { "must": [{ "range": { "release_year": { "gte": datetime.datetime.today().year - 499, "lte": datetime.datetime.today().year, } } }], "filter": [{ "bool": { "should": [{ "match": { "container_id": ident } }], "minimum_should_match": 1, }, }], } } } resp = requests.get( "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), json=query, params=dict(request_cache="true")) resp.raise_for_status() # TODO: abort() resp = resp.json() #print(resp) vals = [(h['key']['year'], h['key']['in_ia'], h['doc_count']) for h in resp['aggregations']['year_in_ia']['buckets']] vals = sorted(vals) return vals