aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_web/search.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_web/search.py')
-rw-r--r--python/fatcat_web/search.py619
1 files changed, 349 insertions, 270 deletions
diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py
index 4a87c735..55caa9c5 100644
--- a/python/fatcat_web/search.py
+++ b/python/fatcat_web/search.py
@@ -2,118 +2,257 @@
"""
Helpers for doing elasticsearch queries (used in the web interface; not part of
the formal API)
-
-TODO: ELASTICSEARCH_*_INDEX should probably be factored out and just hard-coded
"""
+import sys
import datetime
-import requests
-from flask import abort, flash
+from dataclasses import dataclass
+from typing import List, Optional, Any
+
+import elasticsearch
+from elasticsearch_dsl import Search, Q
+import elasticsearch_dsl.response
+
from fatcat_web import app
-def do_search(index, request, limit=30, offset=0, deep_page_limit=2000):
+@dataclass
+class ReleaseQuery:
+ q: Optional[str] = None
+ limit: Optional[int] = None
+ offset: Optional[int] = None
+ fulltext_only: bool = False
+ container_id: Optional[str] = None
+
+ @classmethod
+ def from_args(cls, args) -> 'ReleaseQuery':
+
+ query_str = args.get('q') or '*'
+
+ container_id = args.get('container_id')
+ # TODO: as filter, not in query string
+ if container_id:
+ query_str += ' container_id:"{}"'.format(container_id)
+
+ # TODO: where are container_issnl queries actually used?
+ issnl = args.get('container_issnl')
+ if issnl and query_str:
+ query_str += ' container_issnl:"{}"'.format(issnl)
+
+ offset = args.get('offset', '0')
+ offset = max(0, int(offset)) if offset.isnumeric() else 0
+
+ return ReleaseQuery(
+ q=query_str,
+ offset=offset,
+ fulltext_only=bool(args.get('fulltext_only')),
+ container_id=container_id,
+ )
+
+@dataclass
+class GenericQuery:
+ q: Optional[str] = None
+ limit: Optional[int] = None
+ offset: Optional[int] = None
+
+ @classmethod
+ def from_args(cls, args) -> 'GenericQuery':
+ query_str = args.get('q')
+ if not query_str:
+ query_str = '*'
+ offset = args.get('offset', '0')
+ offset = max(0, int(offset)) if offset.isnumeric() else 0
+
+ return GenericQuery(
+ q=query_str,
+ offset=offset,
+ )
+
+@dataclass
+class SearchHits:
+ count_returned: int
+ count_found: int
+ offset: int
+ limit: int
+ deep_page_limit: int
+ query_time_ms: int
+ results: List[Any]
+
+
+def results_to_dict(response: elasticsearch_dsl.response.Response) -> List[dict]:
+ """
+ Takes a response returns all the hits as JSON objects.
- # Sanity checks
- if limit > 100:
- limit = 100
- if offset < 0:
- offset = 0
- if offset > deep_page_limit:
- # Avoid deep paging problem.
- offset = deep_page_limit
+ Also handles surrogate strings that elasticsearch returns sometimes,
+ probably due to mangled data processing in some pipeline. "Crimes against
+ Unicode"; production workaround
+ """
+
+ results = []
+ for h in response:
+ r = h._d_
+ # print(h.meta._d_)
+ results.append(r)
- request["size"] = int(limit)
- request["from"] = int(offset)
- # print(request)
- resp = requests.get("%s/%s/_search" %
- (app.config['ELASTICSEARCH_BACKEND'], index),
- json=request)
-
- if resp.status_code == 400:
- print("elasticsearch 400: " + str(resp.content))
- flash("Search query failed to parse; you might need to use quotes.<p><code>{}</code>".format(resp.content))
- abort(resp.status_code)
- elif resp.status_code != 200:
- print("elasticsearch non-200 status code: " + str(resp.status_code))
- print(resp.content)
- abort(resp.status_code)
-
- content = resp.json()
- results = [h['_source'] for h in content['hits']['hits']]
for h in results:
- # Handle surrogate strings that elasticsearch returns sometimes,
- # probably due to mangled data processing in some pipeline.
- # "Crimes against Unicode"; production workaround
for key in h:
if type(h[key]) is str:
- h[key] = h[key].encode('utf8', 'ignore').decode('utf8')
+ h[key] = h[key].encode("utf8", "ignore").decode("utf8")
+ return results
- return {"count_returned": len(results),
- "count_found": content['hits']['total'],
- "results": results,
- "offset": offset,
- "deep_page_limit": deep_page_limit}
+def wrap_es_execution(search: Search) -> Any:
+ """
+ Executes a Search object, and converts various ES error types into
+ something we can pretty print to the user.
+ """
+ try:
+ resp = search.execute()
+ except elasticsearch.exceptions.RequestError as e:
+ # this is a "user" error
+ print("elasticsearch 400: " + str(e.info), file=sys.stderr)
+ if e.info.get("error", {}).get("root_cause", {}):
+ raise ValueError(str(e.info["error"]["root_cause"][0].get("reason")))
+ else:
+ raise ValueError(str(e.info))
+ except elasticsearch.exceptions.TransportError as e:
+ # all other errors
+ print("elasticsearch non-200 status code: {}".format(e.info), file=sys.stderr)
+ raise IOError(str(e.info))
+ return resp
+def do_container_search(
+ query: GenericQuery, deep_page_limit: int = 2000
+) -> SearchHits:
-def do_release_search(q, limit=30, fulltext_only=True, offset=0):
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_CONTAINER_INDEX'])
- #print("Search hit: " + q)
- if limit > 100:
- # Sanity check
- limit = 100
+ search = search.query(
+ "query_string",
+ query=query.q,
+ default_operator="AND",
+ analyze_wildcard=True,
+ allow_leading_wildcard=False,
+ lenient=True,
+ fields=["biblio"],
+ )
- # Convert raw DOIs to DOI queries
- if len(q.split()) == 1 and q.startswith("10.") and q.count("/") >= 1:
- q = 'doi:"{}"'.format(q)
+ # Sanity checks
+ limit = min((int(query.limit or 25), 100))
+ offset = max((int(query.offset or 0), 0))
+ if offset > deep_page_limit:
+ # Avoid deep paging problem.
+ offset = deep_page_limit
- if fulltext_only:
- q += " in_web:true"
+ search = search[offset : (offset + limit)]
+
+ resp = wrap_es_execution(search)
+ results = results_to_dict(resp)
+
+ return SearchHits(
+ count_returned=len(results),
+ count_found=int(resp.hits.total),
+ offset=offset,
+ limit=limit,
+ deep_page_limit=deep_page_limit,
+ query_time_ms=int(resp.took),
+ results=results,
+ )
+
+def do_release_search(
+ query: ReleaseQuery, deep_page_limit: int = 2000
+) -> SearchHits:
+
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+
+ # availability filters
+ if query.fulltext_only:
+ search = search.filter("term", in_ia=True)
+
+ # Below, we combine several queries to improve scoring.
+
+ # this query use the fancy built-in query string parser
+ basic_biblio = Q(
+ "query_string",
+ query=query.q,
+ default_operator="AND",
+ analyze_wildcard=True,
+ allow_leading_wildcard=False,
+ lenient=True,
+ fields=[
+ "title^2",
+ "biblio",
+ ],
+ )
+ has_fulltext = Q("term", in_ia=True)
+ poor_metadata = Q(
+ "bool",
+ should=[
+ # if these fields aren't set, metadata is poor. The more that do
+ # not exist, the stronger the signal.
+ Q("bool", must_not=Q("exists", field="title")),
+ Q("bool", must_not=Q("exists", field="release_year")),
+ Q("bool", must_not=Q("exists", field="release_type")),
+ Q("bool", must_not=Q("exists", field="release_stage")),
+ ],
+ )
- search_request = {
- "query": {
- "query_string": {
- "query": q,
- "default_operator": "AND",
- "analyze_wildcard": True,
- "lenient": True,
- "fields": ["biblio"],
- },
- },
- }
+ search = search.query(
+ "boosting",
+ positive=Q("bool", must=basic_biblio, should=[has_fulltext],),
+ negative=poor_metadata,
+ negative_boost=0.5,
+ )
+
+ # Sanity checks
+ limit = min((int(query.limit or 25), 100))
+ offset = max((int(query.offset or 0), 0))
+ if offset > deep_page_limit:
+ # Avoid deep paging problem.
+ offset = deep_page_limit
- resp = do_search(app.config['ELASTICSEARCH_RELEASE_INDEX'], search_request, offset=offset)
- for h in resp['results']:
+ search = search[offset : (offset + limit)]
+
+ resp = wrap_es_execution(search)
+ results = results_to_dict(resp)
+
+ for h in results:
# Ensure 'contrib_names' is a list, not a single string
if type(h['contrib_names']) is not list:
h['contrib_names'] = [h['contrib_names'], ]
h['contrib_names'] = [name.encode('utf8', 'ignore').decode('utf8') for name in h['contrib_names']]
- resp["query"] = { "q": q }
- resp["limit"] = limit
- return resp
+ return SearchHits(
+ count_returned=len(results),
+ count_found=int(resp.hits.total),
+ offset=offset,
+ limit=limit,
+ deep_page_limit=deep_page_limit,
+ query_time_ms=int(resp.took),
+ results=results,
+ )
-def do_container_search(q, limit=30, offset=0):
+def get_elastic_container_random_releases(ident, limit=5):
+ """
+ Returns a list of releases from the container.
+ """
- # Convert raw ISSN-L to ISSN-L query
- if len(q.split()) == 1 and len(q) == 9 and q[0:4].isdigit() and q[4] == '-':
- q = 'issnl:"{}"'.format(q)
+ assert limit > 0 and limit <= 100
- search_request = {
- "query": {
- "query_string": {
- "query": q,
- "default_operator": "AND",
- "analyze_wildcard": True,
- "lenient": True,
- "fields": ["biblio"],
- },
- },
- }
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+ search = search.query(
+ 'bool',
+ must=[
+ Q('term', container_id=ident),
+ Q('range', release_year={ "lte": datetime.datetime.today().year }),
+ ]
+ )
+ search = search.sort('-in_web', '-release_date')
+ search = search[:int(limit)]
- resp = do_search(app.config['ELASTICSEARCH_CONTAINER_INDEX'], search_request, limit=limit, offset=offset)
- resp["query"] = { "q": q }
- resp["limit"] = limit
- return resp
+ search = search.params(request_cache=True)
+ resp = wrap_es_execution(search)
+ results = results_to_dict(resp)
+
+ return results
def get_elastic_entity_stats():
"""
@@ -127,85 +266,73 @@ def get_elastic_entity_stats():
stats = {}
- # 2. releases
- # - total count
- # - total citation records
- # - total (paper, chapter, proceeding)
- # - " with fulltext on web
- # - " open access
- # - " not in KBART, in IA
- #
- # Can do the above with two queries:
- # - all releases, aggregate count and sum(ref_count)
- # - in-scope works, aggregate count by (fulltext, OA, kbart/ia)
-
- # 2a. release totals
- query = {
- "size": 0,
- "aggs": {
- "release_ref_count": { "sum": { "field": "ref_count" } }
- }
- }
- resp = requests.get(
- "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']),
- json=query,
- params=dict(request_cache="true"))
- # TODO: abort()
- resp.raise_for_status()
- resp = resp.json()
+ # release totals
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+ search.aggs.bucket(
+ 'release_ref_count',
+ 'sum',
+ field='ref_count',
+ )
+ search = search[:0] # pylint: disable=unsubscriptable-object
+
+ search = search.params(request_cache=True)
+ resp = wrap_es_execution(search)
+
stats['release'] = {
- "total": resp['hits']['total'],
- "refs_total": int(resp['aggregations']['release_ref_count']['value']),
+ "total": int(resp.hits.total),
+ "refs_total": int(resp.aggregations.release_ref_count.value),
}
- # 2b. paper counts
- query = {
- "size": 0,
- "query": {
- "terms": { "release_type": [
- # "chapter", "thesis",
- "article-journal", "paper-conference",
- ] } },
- "aggs": { "paper_like": { "filters": { "filters": {
- "in_web": { "term": { "in_web": "true" } },
- "is_oa": { "term": { "is_oa": "true" } },
- "in_kbart": { "term": { "in_kbart": "true" } },
- "in_web_not_kbart": { "bool": { "filter": [
- { "term": { "in_web": "true" } },
- { "term": { "in_kbart": "false" } }
- ]}}
- }}}}
- }
- resp = requests.get(
- "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']),
- json=query,
- params=dict(request_cache="true"))
- # TODO: abort()
- resp.raise_for_status()
- resp = resp.json()
- buckets = resp['aggregations']['paper_like']['buckets']
+ # paper counts
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+ search = search.query(
+ 'terms',
+ release_type=[
+ "article-journal",
+ "paper-conference",
+ # "chapter",
+ # "thesis",
+ ],
+ )
+ search.aggs.bucket(
+ 'paper_like',
+ 'filters',
+ filters={
+ "in_web": { "term": { "in_web": "true" } },
+ "is_oa": { "term": { "is_oa": "true" } },
+ "in_kbart": { "term": { "in_kbart": "true" } },
+ "in_web_not_kbart": { "bool": { "filter": [
+ { "term": { "in_web": "true" } },
+ { "term": { "in_kbart": "false" } },
+ ]}},
+ }
+ )
+ search = search[:0]
+
+ search = search.params(request_cache=True)
+ resp = wrap_es_execution(search)
+ buckets = resp.aggregations.paper_like.buckets
stats['papers'] = {
- 'total': resp['hits']['total'],
- 'in_web': buckets['in_web']['doc_count'],
- 'is_oa': buckets['is_oa']['doc_count'],
- 'in_kbart': buckets['in_kbart']['doc_count'],
- 'in_web_not_kbart': buckets['in_web_not_kbart']['doc_count'],
+ 'total': resp.hits.total,
+ 'in_web': buckets.in_web.doc_count,
+ 'is_oa': buckets.is_oa.doc_count,
+ 'in_kbart': buckets.in_kbart.doc_count,
+ 'in_web_not_kbart': buckets.in_web_not_kbart.doc_count,
}
- # 3. containers
- # => total count
- query = {
- "size": 0,
- }
- resp = requests.get(
- "{}/fatcat_container/_search".format(app.config['ELASTICSEARCH_BACKEND']),
- json=query,
- params=dict(request_cache="true"))
- # TODO: abort()
- resp.raise_for_status()
- resp = resp.json()
+ # container counts
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_CONTAINER_INDEX'])
+ search.aggs.bucket(
+ 'release_ref_count',
+ 'sum',
+ field='ref_count',
+ )
+ search = search[:0] # pylint: disable=unsubscriptable-object
+
+ search = search.params(request_cache=True)
+ resp = wrap_es_execution(search)
stats['container'] = {
- "total": resp['hits']['total'],
+ "total": resp.hits.total,
}
return stats
@@ -221,30 +348,36 @@ def get_elastic_container_stats(ident, issnl=None):
preserved
"""
- query = {
- "size": 0,
- "query": {
- "term": { "container_id": ident }
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+ search = search.query(
+ 'term',
+ container_id=ident,
+ )
+ search.aggs.bucket(
+ 'container_stats',
+ 'filters',
+ filters={
+ "in_web": {
+ "term": { "in_web": True },
+ },
+ "in_kbart": {
+ "term": { "in_kbart": True },
+ },
+ "is_preserved": {
+ "term": { "is_preserved": True },
+ },
},
- "aggs": { "container_stats": { "filters": { "filters": {
- "in_web": { "term": { "in_web": "true" } },
- "in_kbart": { "term": { "in_kbart": "true" } },
- "is_preserved": { "term": { "is_preserved": "true" } },
- }}}}
- }
- resp = requests.get(
- "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']),
- json=query,
- params=dict(request_cache="true"))
- # TODO: abort()
- #print(resp.json())
- resp.raise_for_status()
- resp = resp.json()
- buckets = resp['aggregations']['container_stats']['buckets']
+ )
+ search = search[:0]
+
+ search = search.params(request_cache=True)
+ resp = wrap_es_execution(search)
+
+ buckets = resp.aggregations.container_stats.buckets
stats = {
'ident': ident,
'issnl': issnl,
- 'total': resp['hits']['total'],
+ 'total': resp.hits.total,
'in_web': buckets['in_web']['doc_count'],
'in_kbart': buckets['in_kbart']['doc_count'],
'is_preserved': buckets['is_preserved']['doc_count'],
@@ -252,48 +385,6 @@ def get_elastic_container_stats(ident, issnl=None):
return stats
-def get_elastic_container_random_releases(ident, limit=5):
- """
- Returns a list of releases from the container.
- """
-
- assert limit > 0 and limit <= 100
-
- query = {
- "size": int(limit),
- "sort": [
- { "in_web": {"order": "desc"} },
- { "release_date": {"order": "desc"} },
- ],
- "query": {
- "bool": {
- "must": [
- { "term": { "container_id": ident } },
- { "range": { "release_year": { "lte": datetime.datetime.today().year } } },
- ],
- },
- },
- }
- resp = requests.get(
- "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']),
- json=query,
- params=dict(request_cache="true"))
- # TODO: abort()
- #print(resp.json())
- resp.raise_for_status()
- resp = resp.json()
- #print(resp)
- hits = [h['_source'] for h in resp['hits']['hits']]
- for h in hits:
- # Handle surrogate strings that elasticsearch returns sometimes,
- # probably due to mangled data processing in some pipeline.
- # "Crimes against Unicode"; production workaround
- for key in h:
- if type(h[key]) is str:
- h[key] = h[key].encode('utf8', 'ignore').decode('utf8')
-
- return hits
-
def get_elastic_container_histogram(ident):
"""
Fetches a stacked histogram
@@ -304,58 +395,46 @@ def get_elastic_container_histogram(ident):
(year, in_ia, count)
"""
- query = {
- "aggs": {
- "year_in_ia": {
- "composite": {
- "size": 1000,
- "sources": [
- {"year": {
- "histogram": {
- "field": "release_year",
- "interval": 1,
- }}},
- {"in_ia": {
- "terms": {
- "field": "in_ia",
- }}},
- ],
+ search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+ search = search.query(
+ 'bool',
+ must=[
+ Q("range", release_year={
+ "gte": datetime.datetime.today().year - 499,
+ "lte": datetime.datetime.today().year,
+ }),
+ ],
+ filter=[
+ Q("bool", minimum_should_match=1, should=[
+ Q("match", container_id=ident),
+ ]),
+ ],
+ )
+ search.aggs.bucket(
+ 'year_in_ia',
+ 'composite',
+ size=1000,
+ sources=[
+ {"year": {
+ "histogram": {
+ "field": "release_year",
+ "interval": 1,
},
- },
- },
- "size": 0,
- "query": {
- "bool": {
- "must": [{
- "range": {
- "release_year": {
- "gte": datetime.datetime.today().year - 499,
- "lte": datetime.datetime.today().year,
- }
- }
- }],
- "filter": [{
- "bool": {
- "should": [{
- "match": {
- "container_id": ident
- }
- }],
- "minimum_should_match": 1,
- },
- }],
- }
- }
- }
- resp = requests.get(
- "{}/fatcat_release/_search".format(app.config['ELASTICSEARCH_BACKEND']),
- json=query,
- params=dict(request_cache="true"))
- resp.raise_for_status()
- # TODO: abort()
- resp = resp.json()
- #print(resp)
+ }},
+ {"in_ia": {
+ "terms": {
+ "field": "in_ia",
+ },
+ }},
+ ],
+ )
+ search = search[:0]
+
+ search = search.params(request_cache='true')
+ resp = wrap_es_execution(search)
+
+ buckets = resp.aggregations.year_in_ia.buckets
vals = [(h['key']['year'], h['key']['in_ia'], h['doc_count'])
- for h in resp['aggregations']['year_in_ia']['buckets']]
+ for h in buckets]
vals = sorted(vals)
return vals