From 693a6f71b1afef686b6783ba3afb1a67bb14b62b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 25 Mar 2020 13:02:53 -0700 Subject: WIP: refactoring search to use elasticsearch-dsl --- python/fatcat_web/__init__.py | 5 +- python/fatcat_web/search.py | 285 ++++++++++++++++++++---------------------- 2 files changed, 137 insertions(+), 153 deletions(-) diff --git a/python/fatcat_web/__init__.py b/python/fatcat_web/__init__.py index 562ffeb2..487de58a 100644 --- a/python/fatcat_web/__init__.py +++ b/python/fatcat_web/__init__.py @@ -11,6 +11,7 @@ from authlib.flask.client import OAuth from loginpass import create_flask_blueprint, Gitlab, GitHub, ORCiD from raven.contrib.flask import Sentry import fatcat_openapi_client +import elasticsearch from fatcat_web.web_config import Config @@ -71,7 +72,9 @@ mwoauth = MWOAuth( mwoauth.handshaker.user_agent = "fatcat.wiki;python_web_interface" app.register_blueprint(mwoauth.bp, url_prefix='/auth/wikipedia') -from fatcat_web import routes, editing_routes, auth, cors, forms # noqa: E402 +app.es_client = elasticsearch.Elasticsearch(Config.ELASTICSEARCH_BACKEND) + +from fatcat_web import routes, editing_routes, auth, cors, forms # TODO: blocking on ORCID support in loginpass if Config.ORCID_CLIENT_ID: diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index 4a87c735..87d07e55 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -11,7 +11,10 @@ import requests from flask import abort, flash from fatcat_web import app -def do_search(index, request, limit=30, offset=0, deep_page_limit=2000): +import elasticsearch +from elasticsearch_dsl import Search, Q + +def generic_search_execute(search, limit=30, offset=0, deep_page_limit=2000): # Sanity checks if limit > 100: @@ -22,25 +25,24 @@ def do_search(index, request, limit=30, offset=0, deep_page_limit=2000): # Avoid deep paging problem. offset = deep_page_limit - request["size"] = int(limit) - request["from"] = int(offset) - # print(request) - resp = requests.get("%s/%s/_search" % - (app.config['ELASTICSEARCH_BACKEND'], index), - json=request) - - if resp.status_code == 400: - print("elasticsearch 400: " + str(resp.content)) - flash("Search query failed to parse; you might need to use quotes.

{}".format(resp.content)) - abort(resp.status_code) - elif resp.status_code != 200: - print("elasticsearch non-200 status code: " + str(resp.status_code)) - print(resp.content) - abort(resp.status_code) - - content = resp.json() - results = [h['_source'] for h in content['hits']['hits']] - for h in results: + search = search[int(offset):int(offset)+int(limit)] + + try: + resp = search.execute() + except elasticsearch.exceptions.RequestError as e: + # this is a "user" error + print("elasticsearch 400: " + str(e.info)) + flash("Search query failed to parse; you might need to use quotes.

{}: {}".format(e.error, e.info['error']['root_cause'][0]['reason'])) + abort(e.status_code) + except elasticsearch.exceptions.TransportError as e: + # all other errors + print("elasticsearch non-200 status code: {}".format(e.info)) + flash("Elasticsearch error: {}".format(e.error)) + abort(e.status_code) + + # just the dict() + hits = [h._d_ for h in resp] + for h in hits: # Handle surrogate strings that elasticsearch returns sometimes, # probably due to mangled data processing in some pipeline. # "Crimes against Unicode"; production workaround @@ -48,20 +50,16 @@ def do_search(index, request, limit=30, offset=0, deep_page_limit=2000): if type(h[key]) is str: h[key] = h[key].encode('utf8', 'ignore').decode('utf8') - return {"count_returned": len(results), - "count_found": content['hits']['total'], - "results": results, + return {"count_returned": len(hits), + "count_found": int(resp.hits.total), + "results": hits, "offset": offset, + "limit": limit, "deep_page_limit": deep_page_limit} def do_release_search(q, limit=30, fulltext_only=True, offset=0): - #print("Search hit: " + q) - if limit > 100: - # Sanity check - limit = 100 - # Convert raw DOIs to DOI queries if len(q.split()) == 1 and q.startswith("10.") and q.count("/") >= 1: q = 'doi:"{}"'.format(q) @@ -69,26 +67,25 @@ def do_release_search(q, limit=30, fulltext_only=True, offset=0): if fulltext_only: q += " in_web:true" - search_request = { - "query": { - "query_string": { - "query": q, - "default_operator": "AND", - "analyze_wildcard": True, - "lenient": True, - "fields": ["biblio"], - }, - }, - } + search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ + .query( + 'query_string', + query=q, + default_operator="AND", + analyze_wildcard=True, + lenient=True, + fields=["biblio"], + ) + + resp = generic_search_execute(search, offset=offset) - resp = do_search(app.config['ELASTICSEARCH_RELEASE_INDEX'], search_request, offset=offset) for h in resp['results']: + print(h) # Ensure 'contrib_names' is a list, not a single string if type(h['contrib_names']) is not list: h['contrib_names'] = [h['contrib_names'], ] h['contrib_names'] = [name.encode('utf8', 'ignore').decode('utf8') for name in h['contrib_names']] resp["query"] = { "q": q } - resp["limit"] = limit return resp @@ -98,21 +95,18 @@ def do_container_search(q, limit=30, offset=0): if len(q.split()) == 1 and len(q) == 9 and q[0:4].isdigit() and q[4] == '-': q = 'issnl:"{}"'.format(q) - search_request = { - "query": { - "query_string": { - "query": q, - "default_operator": "AND", - "analyze_wildcard": True, - "lenient": True, - "fields": ["biblio"], - }, - }, - } - - resp = do_search(app.config['ELASTICSEARCH_CONTAINER_INDEX'], search_request, limit=limit, offset=offset) + search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ + .query( + 'query_string', + query=q, + default_operator="AND", + analyze_wildcard=True, + lenient=True, + fields=["biblio"], + ) + + resp = generic_search_execute(search, offset=offset) resp["query"] = { "q": q } - resp["limit"] = limit return resp def get_elastic_entity_stats(): @@ -127,85 +121,81 @@ def get_elastic_entity_stats(): stats = {} - # 2. releases - # - total count - # - total citation records - # - total (paper, chapter, proceeding) - # - " with fulltext on web - # - " open access - # - " not in KBART, in IA - # - # Can do the above with two queries: - # - all releases, aggregate count and sum(ref_count) - # - in-scope works, aggregate count by (fulltext, OA, kbart/ia) - - # 2a. release totals - query = { - "size": 0, - "aggs": { - "release_ref_count": { "sum": { "field": "ref_count" } } - } - } - resp = requests.get( - "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), - json=query, - params=dict(request_cache="true")) - # TODO: abort() - resp.raise_for_status() - resp = resp.json() + # release totals + search = Search( + using=app.es_client, + index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ + .extra(request_cache=True) + search.aggs.bucket( + 'release_ref_count', + 'sum', + field='ref_count', + ) + search = search[:0] + + # NOTE: not catching exceptions + resp = search.execute() stats['release'] = { - "total": resp['hits']['total'], - "refs_total": int(resp['aggregations']['release_ref_count']['value']), + "total": int(resp.hits.total), + "refs_total": int(resp.aggregations.release_ref_count.value), } - # 2b. paper counts - query = { - "size": 0, - "query": { - "terms": { "release_type": [ - # "chapter", "thesis", - "article-journal", "paper-conference", - ] } }, - "aggs": { "paper_like": { "filters": { "filters": { - "in_web": { "term": { "in_web": "true" } }, - "is_oa": { "term": { "is_oa": "true" } }, - "in_kbart": { "term": { "in_kbart": "true" } }, - "in_web_not_kbart": { "bool": { "filter": [ - { "term": { "in_web": "true" } }, - { "term": { "in_kbart": "false" } } - ]}} - }}}} - } - resp = requests.get( - "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), - json=query, - params=dict(request_cache="true")) - # TODO: abort() - resp.raise_for_status() - resp = resp.json() - buckets = resp['aggregations']['paper_like']['buckets'] + # paper counts + search = Search( + using=app.es_client, + index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ + .query( + 'terms', + release_type=[ + "article-journal", + "paper-conference", + # "chapter", + # "thesis", + ], + ) \ + .extra(request_cache=True) + search.aggs.bucket( + 'paper_like', + 'filters', + filters={ + "in_web": { "term": { "in_web": "true" } }, + "is_oa": { "term": { "is_oa": "true" } }, + "in_kbart": { "term": { "in_kbart": "true" } }, + "in_web_not_kbart": { "bool": { "filter": [ + { "term": { "in_web": "true" } }, + { "term": { "in_kbart": "false" } }, + ]}}, + } + ) + search = search[:0] + + # NOTE: not catching exceptions + resp = search.execute() + buckets = resp.aggregations.paper_like.buckets stats['papers'] = { - 'total': resp['hits']['total'], - 'in_web': buckets['in_web']['doc_count'], - 'is_oa': buckets['is_oa']['doc_count'], - 'in_kbart': buckets['in_kbart']['doc_count'], - 'in_web_not_kbart': buckets['in_web_not_kbart']['doc_count'], + 'total': resp.hits.total, + 'in_web': buckets.in_web.doc_count, + 'is_oa': buckets.is_oa.doc_count, + 'in_kbart': buckets.in_kbart.doc_count, + 'in_web_not_kbart': buckets.in_web_not_kbart.doc_count, } - # 3. containers - # => total count - query = { - "size": 0, - } - resp = requests.get( - "{}/fatcat_container/_search".format(app.config['ELASTICSEARCH_BACKEND']), - json=query, - params=dict(request_cache="true")) - # TODO: abort() - resp.raise_for_status() - resp = resp.json() + # container counts + search = Search( + using=app.es_client, + index=app.config['ELASTICSEARCH_CONTAINER_INDEX']) \ + .extra(request_cache=True) + search.aggs.bucket( + 'release_ref_count', + 'sum', + field='ref_count', + ) + search = search[:0] + + # NOTE: not catching exceptions + resp = search.execute() stats['container'] = { - "total": resp['hits']['total'], + "total": resp.hits.total, } return stats @@ -259,31 +249,22 @@ def get_elastic_container_random_releases(ident, limit=5): assert limit > 0 and limit <= 100 - query = { - "size": int(limit), - "sort": [ - { "in_web": {"order": "desc"} }, - { "release_date": {"order": "desc"} }, - ], - "query": { - "bool": { - "must": [ - { "term": { "container_id": ident } }, - { "range": { "release_year": { "lte": datetime.datetime.today().year } } }, - ], - }, - }, - } - resp = requests.get( - "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), - json=query, - params=dict(request_cache="true")) - # TODO: abort() - #print(resp.json()) - resp.raise_for_status() - resp = resp.json() - #print(resp) - hits = [h['_source'] for h in resp['hits']['hits']] + search = Search(using=app.es_client, index=app.conf.ELASTICSEARCH_RELEASE_INDEX) \ + .query('bool', + must=[ + Q('term', container_id=ident), + Q('range', release_year={ "lte": datetime.datetime.today().year }), + ] + ) \ + .sort('-in_web', '-release_date') \ + .extra(request_cache=True) + + search = search[:int(limit)] + + resp = search.execute() + + hits = [dict(h.source) for h in resp] + for h in hits: # Handle surrogate strings that elasticsearch returns sometimes, # probably due to mangled data processing in some pipeline. -- cgit v1.2.3