diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2020-03-25 13:02:53 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2020-07-24 10:07:48 -0700 | 
| commit | 693a6f71b1afef686b6783ba3afb1a67bb14b62b (patch) | |
| tree | 9be06e4d5c32d7a1ff415788185b51960cf74f2e /python | |
| parent | 8b00843af1366cf019c896057706ace4afff27ba (diff) | |
| download | fatcat-693a6f71b1afef686b6783ba3afb1a67bb14b62b.tar.gz fatcat-693a6f71b1afef686b6783ba3afb1a67bb14b62b.zip | |
WIP: refactoring search to use elasticsearch-dsl
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_web/__init__.py | 5 | ||||
| -rw-r--r-- | python/fatcat_web/search.py | 285 | 
2 files changed, 137 insertions, 153 deletions
| diff --git a/python/fatcat_web/__init__.py b/python/fatcat_web/__init__.py index 562ffeb2..487de58a 100644 --- a/python/fatcat_web/__init__.py +++ b/python/fatcat_web/__init__.py @@ -11,6 +11,7 @@ from authlib.flask.client import OAuth  from loginpass import create_flask_blueprint, Gitlab, GitHub, ORCiD  from raven.contrib.flask import Sentry  import fatcat_openapi_client +import elasticsearch  from fatcat_web.web_config import Config @@ -71,7 +72,9 @@ mwoauth = MWOAuth(  mwoauth.handshaker.user_agent = "fatcat.wiki;python_web_interface"  app.register_blueprint(mwoauth.bp, url_prefix='/auth/wikipedia') -from fatcat_web import routes, editing_routes, auth, cors, forms  # noqa: E402 +app.es_client = elasticsearch.Elasticsearch(Config.ELASTICSEARCH_BACKEND) + +from fatcat_web import routes, editing_routes, auth, cors, forms  # TODO: blocking on ORCID support in loginpass  if Config.ORCID_CLIENT_ID: diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index 4a87c735..87d07e55 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -11,7 +11,10 @@ import requests  from flask import abort, flash  from fatcat_web import app -def do_search(index, request, limit=30, offset=0, deep_page_limit=2000): +import elasticsearch +from elasticsearch_dsl import Search, Q + +def generic_search_execute(search, limit=30, offset=0, deep_page_limit=2000):      # Sanity checks      if limit > 100: @@ -22,25 +25,24 @@ def do_search(index, request, limit=30, offset=0, deep_page_limit=2000):          # Avoid deep paging problem.          offset = deep_page_limit -    request["size"] = int(limit) -    request["from"] = int(offset) -    # print(request) -    resp = requests.get("%s/%s/_search" % -            (app.config['ELASTICSEARCH_BACKEND'], index), -        json=request) - -    if resp.status_code == 400: -        print("elasticsearch 400: " + str(resp.content)) -        flash("Search query failed to parse; you might need to use quotes.<p><code>{}</code>".format(resp.content)) -        abort(resp.status_code) -    elif resp.status_code != 200: -        print("elasticsearch non-200 status code: " + str(resp.status_code)) -        print(resp.content) -        abort(resp.status_code) - -    content = resp.json() -    results = [h['_source'] for h in content['hits']['hits']] -    for h in results: +    search = search[int(offset):int(offset)+int(limit)] + +    try: +        resp = search.execute() +    except elasticsearch.exceptions.RequestError as e: +        # this is a "user" error +        print("elasticsearch 400: " + str(e.info)) +        flash("Search query failed to parse; you might need to use quotes.<p><code>{}: {}</code>".format(e.error, e.info['error']['root_cause'][0]['reason'])) +        abort(e.status_code) +    except elasticsearch.exceptions.TransportError as e: +        # all other errors +        print("elasticsearch non-200 status code: {}".format(e.info)) +        flash("Elasticsearch error: {}".format(e.error)) +        abort(e.status_code) + +    # just the dict() +    hits = [h._d_ for h in resp] +    for h in hits:          # Handle surrogate strings that elasticsearch returns sometimes,          # probably due to mangled data processing in some pipeline.          # "Crimes against Unicode"; production workaround @@ -48,20 +50,16 @@ def do_search(index, request, limit=30, offset=0, deep_page_limit=2000):              if type(h[key]) is str:                  h[key] = h[key].encode('utf8', 'ignore').decode('utf8') -    return {"count_returned": len(results), -            "count_found": content['hits']['total'], -            "results": results, +    return {"count_returned": len(hits), +            "count_found": int(resp.hits.total), +            "results": hits,              "offset": offset, +            "limit": limit,              "deep_page_limit": deep_page_limit}  def do_release_search(q, limit=30, fulltext_only=True, offset=0): -    #print("Search hit: " + q) -    if limit > 100: -        # Sanity check -        limit = 100 -      # Convert raw DOIs to DOI queries      if len(q.split()) == 1 and q.startswith("10.") and q.count("/") >= 1:          q = 'doi:"{}"'.format(q) @@ -69,26 +67,25 @@ def do_release_search(q, limit=30, fulltext_only=True, offset=0):      if fulltext_only:          q += " in_web:true" -    search_request = { -        "query": { -            "query_string": { -                "query": q, -                "default_operator": "AND", -                "analyze_wildcard": True, -                "lenient": True, -                "fields": ["biblio"], -            }, -        }, -    } +    search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ +        .query( +            'query_string', +            query=q, +            default_operator="AND", +            analyze_wildcard=True, +            lenient=True, +            fields=["biblio"], +        ) + +    resp = generic_search_execute(search, offset=offset) -    resp = do_search(app.config['ELASTICSEARCH_RELEASE_INDEX'], search_request, offset=offset)      for h in resp['results']: +        print(h)          # Ensure 'contrib_names' is a list, not a single string          if type(h['contrib_names']) is not list:              h['contrib_names'] = [h['contrib_names'], ]          h['contrib_names'] = [name.encode('utf8', 'ignore').decode('utf8') for name in h['contrib_names']]      resp["query"] = { "q": q } -    resp["limit"] = limit      return resp @@ -98,21 +95,18 @@ def do_container_search(q, limit=30, offset=0):      if len(q.split()) == 1 and len(q) == 9 and q[0:4].isdigit() and q[4] == '-':          q = 'issnl:"{}"'.format(q) -    search_request = { -        "query": { -            "query_string": { -                "query": q, -                "default_operator": "AND", -                "analyze_wildcard": True, -                "lenient": True, -                "fields": ["biblio"], -            }, -        }, -    } - -    resp = do_search(app.config['ELASTICSEARCH_CONTAINER_INDEX'], search_request, limit=limit, offset=offset) +    search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ +        .query( +            'query_string', +            query=q, +            default_operator="AND", +            analyze_wildcard=True, +            lenient=True, +            fields=["biblio"], +        ) + +    resp = generic_search_execute(search, offset=offset)      resp["query"] = { "q": q } -    resp["limit"] = limit      return resp  def get_elastic_entity_stats(): @@ -127,85 +121,81 @@ def get_elastic_entity_stats():      stats = {} -    # 2. releases -    #  - total count -    #  - total citation records -    #  - total (paper, chapter, proceeding) -    #  - " with fulltext on web -    #  - " open access -    #  - " not in KBART, in IA -    # -    # Can do the above with two queries: -    #  - all releases, aggregate count and sum(ref_count) -    #  - in-scope works, aggregate count by (fulltext, OA, kbart/ia) - -    # 2a. release totals -    query = { -        "size": 0, -        "aggs": { -            "release_ref_count": { "sum": { "field": "ref_count" } } -        } -    } -    resp = requests.get( -        "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), -        json=query, -        params=dict(request_cache="true")) -    # TODO: abort() -    resp.raise_for_status() -    resp = resp.json() +    # release totals +    search = Search( +        using=app.es_client, +        index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ +        .extra(request_cache=True) +    search.aggs.bucket( +        'release_ref_count', +        'sum', +        field='ref_count', +    ) +    search = search[:0] + +    # NOTE: not catching exceptions +    resp = search.execute()      stats['release'] = { -        "total": resp['hits']['total'], -        "refs_total": int(resp['aggregations']['release_ref_count']['value']), +        "total": int(resp.hits.total), +        "refs_total": int(resp.aggregations.release_ref_count.value),      } -    # 2b. paper counts -    query = { -        "size": 0, -        "query": { -            "terms": { "release_type": [ -                # "chapter", "thesis", -                "article-journal", "paper-conference", -            ] } }, -        "aggs": { "paper_like": { "filters": { "filters": { -                "in_web": { "term": { "in_web": "true" } }, -                "is_oa": { "term": { "is_oa": "true" } }, -                "in_kbart": { "term": { "in_kbart": "true" } }, -                "in_web_not_kbart": { "bool": { "filter": [ -                        { "term": { "in_web": "true" } }, -                        { "term": { "in_kbart": "false" } } -                ]}} -        }}}} -    } -    resp = requests.get( -        "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), -        json=query, -        params=dict(request_cache="true")) -    # TODO: abort() -    resp.raise_for_status() -    resp = resp.json() -    buckets = resp['aggregations']['paper_like']['buckets'] +    # paper counts +    search = Search( +        using=app.es_client, +        index=app.config['ELASTICSEARCH_RELEASE_INDEX']) \ +        .query( +            'terms', +            release_type=[ +                "article-journal", +                "paper-conference", +                # "chapter", +                # "thesis", +            ], +        ) \ +        .extra(request_cache=True) +    search.aggs.bucket( +        'paper_like', +        'filters', +        filters={ +            "in_web": { "term": { "in_web": "true" } }, +            "is_oa": { "term": { "is_oa": "true" } }, +            "in_kbart": { "term": { "in_kbart": "true" } }, +            "in_web_not_kbart": { "bool": { "filter": [ +                { "term": { "in_web": "true" } }, +                { "term": { "in_kbart": "false" } }, +            ]}}, +        } +    ) +    search = search[:0] + +    # NOTE: not catching exceptions +    resp = search.execute() +    buckets = resp.aggregations.paper_like.buckets      stats['papers'] = { -        'total': resp['hits']['total'], -        'in_web': buckets['in_web']['doc_count'], -        'is_oa': buckets['is_oa']['doc_count'], -        'in_kbart': buckets['in_kbart']['doc_count'], -        'in_web_not_kbart': buckets['in_web_not_kbart']['doc_count'], +        'total': resp.hits.total, +        'in_web': buckets.in_web.doc_count, +        'is_oa': buckets.is_oa.doc_count, +        'in_kbart': buckets.in_kbart.doc_count, +        'in_web_not_kbart': buckets.in_web_not_kbart.doc_count,      } -    # 3. containers -    #   => total count -    query = { -        "size": 0, -    } -    resp = requests.get( -        "{}/fatcat_container/_search".format(app.config['ELASTICSEARCH_BACKEND']), -        json=query, -        params=dict(request_cache="true")) -    # TODO: abort() -    resp.raise_for_status() -    resp = resp.json() +    # container counts +    search = Search( +        using=app.es_client, +        index=app.config['ELASTICSEARCH_CONTAINER_INDEX']) \ +        .extra(request_cache=True) +    search.aggs.bucket( +        'release_ref_count', +        'sum', +        field='ref_count', +    ) +    search = search[:0] + +    # NOTE: not catching exceptions +    resp = search.execute()      stats['container'] = { -        "total": resp['hits']['total'], +        "total": resp.hits.total,      }      return stats @@ -259,31 +249,22 @@ def get_elastic_container_random_releases(ident, limit=5):      assert limit > 0 and limit <= 100 -    query = { -        "size": int(limit), -        "sort": [ -            { "in_web": {"order": "desc"} }, -            { "release_date": {"order": "desc"} }, -        ], -        "query": { -            "bool": { -                "must": [ -                    { "term": { "container_id": ident } }, -                    { "range": { "release_year": { "lte": datetime.datetime.today().year } } }, -                ], -            }, -        }, -    } -    resp = requests.get( -        "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']), -        json=query, -        params=dict(request_cache="true")) -    # TODO: abort() -    #print(resp.json()) -    resp.raise_for_status() -    resp = resp.json() -    #print(resp) -    hits = [h['_source'] for h in resp['hits']['hits']] +    search = Search(using=app.es_client, index=app.conf.ELASTICSEARCH_RELEASE_INDEX) \ +        .query('bool', +            must=[ +                Q('term', container_id=ident), +                Q('range', release_year={ "lte": datetime.datetime.today().year }), +            ] +        ) \ +        .sort('-in_web', '-release_date') \ +        .extra(request_cache=True) + +    search = search[:int(limit)] + +    resp = search.execute() + +    hits = [dict(h.source) for h in resp] +      for h in hits:          # Handle surrogate strings that elasticsearch returns sometimes,          # probably due to mangled data processing in some pipeline. | 
