diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-04-15 17:01:21 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-07-23 10:55:09 -0700 |
commit | a5a8811a605080f2cd9eb575c33a17f045c43674 (patch) | |
tree | f8d1b36ae178363f0b54a325faee1dbd42f70be8 /python | |
parent | 16157db3c47e0663a9cfaa60482204f88126e8f7 (diff) | |
download | fatcat-a5a8811a605080f2cd9eb575c33a17f045c43674.tar.gz fatcat-a5a8811a605080f2cd9eb575c33a17f045c43674.zip |
initial inbound/outbound reference query helpers
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/references.py | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/python/fatcat_tools/references.py b/python/fatcat_tools/references.py new file mode 100644 index 00000000..c9730174 --- /dev/null +++ b/python/fatcat_tools/references.py @@ -0,0 +1,450 @@ +""" +Helper routines for working with the fatcat citation graph, which is a separate +index of reference links between works in the main catalog. + +See bulk citation and citation API proposals for design documentation. + +TODO: + + surt_ify() helper (URL to SURT for queries) + CSL enrichment method (using only elasticsearch mget) + CSL enrichment for fatcat enrichment + access transform + microfilm access in access transform + + all_outbound_refs(...) -> List[BiblioRef] + all_inbound_refs(...) -> List[BiblioRef] + same as get_outbound_refs()/get_inbound_refs(), but does a scroll (return list or iterator?) + (optional; maybe not public) +""" + +import sys +import json +import datetime +import argparse +from enum import Enum +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel +import elasticsearch +from elasticsearch_dsl import Search, Q +from fatcat_openapi_client import ReleaseEntity + +from fatcat_tools import public_api + + + +class BiblioRef(BaseModel): + """bibliographic reference""" + # ("release", source_release_ident, ref_index) + # ("wikipedia", source_wikipedia_article, ref_index) + _key: Optional[str] + update_ts: Optional[datetime.datetime] + + # metadata about source of reference + source_release_ident: Optional[str] + source_work_ident: Optional[str] + # with lang prefix like "en:Superglue" + source_wikipedia_article: Optional[str] + # skipped: source_openlibrary_work + # skipped: source_url_surt + source_release_stage: Optional[str] + source_year: Optional[int] + + # context of the reference itself + # 1-indexed, not 0-indexed + ref_index: Optional[int] # TODO: actually optional? + # eg, "Lee86", "BIB23" + ref_key: Optional[str] + # eg, page number + ref_locator: Optional[str] + + # target of reference (identifiers) + target_release_ident: Optional[str] + target_work_ident: Optional[str] + target_openlibrary_work: Optional[str] + target_url_surt: Optional[str] + # would not be stored in elasticsearch, but would be auto-generated by all "get" methods from the SURT, so calling code does not need to do SURT transform + target_url: Optional[str] + # skipped: target_wikipedia_article + + # crossref, pubmed, grobid, etc + match_provenance: str + # strong, weak, etc + match_status: Optional[str] + # TODO: "match_strength"? + # "doi", "isbn", "fuzzy title, author", etc + # maybe "fuzzy-title-author"? + match_reason: Optional[str] + + # only if no release_ident link/match + target_unstructured: Optional[str] + target_csl: Optional[Dict[str, Any]] + +class AccessType(str, Enum): + """describes type of access URL""" + + wayback = "wayback" + ia_file = "ia_file" + ia_microfilm = "ia_microfilm" + repository = "repository" + +class AccessOption(BaseModel): + + access_type: AccessType + + # note: for `target_url` refs, would do a CDX lookup and this URL would be + # a valid/HTTP-200 web.archive.org capture URL + access_url: str + + # application/pdf, text/html, etc + # blank for landing pages + mimetype: Optional[str] + + size_bytes: Optional[int] + thumbnail_url: Optional[str] + +class CslBiblioRef(BaseModel): + # an "enriched" version of BiblioRef with metadata about the source or + # target entity. would be "hydrated" via a lookup to, eg, the + # `fatcat_release` elasticsearch index (fast mget fetch with a single + # request), as opposed to fatcat API fetches + ref: BiblioRef + csl: Optional[Dict[str, Any]] + access: List[AccessOption] + + class Config: + arbitrary_types_allowed = True + +class FatcatBiblioRef(BaseModel): + # enriched version of BiblioRef with complete ReleaseEntity object as + # fetched from the fatcat API. CSL-JSON metadata would be derived from + # the full release entity. + ref: BiblioRef + release: Optional[ReleaseEntity] + csl: Optional[Dict[str, Any]] + access: List[AccessOption] + + class Config: + arbitrary_types_allowed = True + +class RefHits(BaseModel): + count_returned: int + count_total: int + offset: int + limit: int + query_time_ms: int + query_wall_time_ms: int + result_refs: List[BiblioRef] + +def _execute_ref_query(search: Any, limit: int, offset: Optional[int] = None) -> List[BiblioRef]: + + limit = min((int(limit or 15), 200)) + if not offset or offset < 0: + offset = 0 + + search = search.params(track_total_hits=True) + search = search[offset : (offset + limit)] + + query_start = datetime.datetime.now() + try: + resp = search.execute() + except elasticsearch.exceptions.RequestError as e_raw: + # this is a "user" error + e: Any = e_raw + #logging.warn("elasticsearch 400: " + str(e.info)) + if e.info.get("error", {}).get("root_cause", {}): + raise ValueError(str(e.info["error"]["root_cause"][0].get("reason"))) from e + else: + raise ValueError(str(e.info)) from e + except elasticsearch.exceptions.TransportError as e: + # all other errors + #logging.warn(f"elasticsearch non-200 status code: {e.info}") + raise IOError(str(e.info)) from e + query_delta = datetime.datetime.now() - query_start + + result_refs = [] + for h in resp.hits: + # might be a list because of consolidation + if isinstance(h._d_.get('source_work_ident'), list): + h._d_['source_work_ident'] = h._d_['source_work_ident'][0] + result_refs.append(BiblioRef.parse_obj(h._d_)) + + return RefHits( + count_returned=len(result_refs), + # ES 7.x style "total" + count_total=resp.hits.total.value, + offset=offset, + limit=limit, + query_time_ms=int(resp.took), + query_wall_time_ms=int(query_delta.total_seconds() * 1000), + result_refs=result_refs, + ) + + +def get_outbound_refs( + es_client: Any, + release_ident: Optional[str] = None, + work_ident: Optional[str] = None, + wikipedia_article: Optional[str] = None, + limit: int = 100, + offset: Optional[int] = None, + es_index: str = "fatcat_ref", +) -> List[BiblioRef]: + + search = Search(using=es_client, index=es_index) + + if release_ident: + search = search.filter("term", source_release_ident=release_ident) + elif work_ident: + search = search.filter("term", source_work_ident=work_ident) + elif wikipedia_article: + search = search.filter("term", source_wikipedia_article=wikipedia_article) + else: + raise ValueError("require a lookup key") + + # TODO: schema doesn't support either of these currently + #search = search.sort("ref_index") + #search = search.sort("ref_key") + + # re-sort by index + hits = _execute_ref_query(search, limit=limit, offset=offset) + hits.result_refs = sorted(hits.result_refs, key=lambda r: r.ref_index or 0) + return hits + +def get_inbound_refs( + es_client: Any, + release_ident: Optional[str] = None, + work_ident: Optional[str] = None, + openlibrary_work: Optional[str] = None, + url_surt: Optional[str] = None, + url: Optional[str] = None, + consolidate_works: bool = True, + filter_stage: List[str] = [], + filter_type: List[str] = [], + limit: int = 25, + offset: Optional[int] = None, + es_index: str = "fatcat_ref", +) -> List[BiblioRef]: + # TODO: filter_stage, filter_type + + if url and not url_surt: + url = surt_ify(url) + + search = Search(using=es_client, index=es_index) + + if consolidate_works: + search = search.extra( + collapse={ + "field": "source_work_ident", + "inner_hits": {"name": "source_more", "size": 0,}, + } + ) + + if release_ident: + search = search.filter("term", target_release_ident=release_ident) + elif work_ident: + search = search.filter("term", target_work_ident=work_ident) + elif openlibrary_work: + search = search.filter("term", target_openlibrary_work=openlibrary_work) + elif url_surt: + search = search.filter("term", target_url_surt=url_surt) + else: + raise ValueError("require a lookup key") + + # TODO: wrong type, not int? and maybe need to index differently? + #search = search.sort("source_year") + + return _execute_ref_query(search, limit=limit, offset=offset) + +def count_inbound_refs( + es_client: Any, + release_ident: Optional[str] = None, + work_ident: Optional[str] = None, + openlibrary_work: Optional[str] = None, + url_surt: Optional[str] = None, + url: Optional[str] = None, + filter_stage: List[str] = [], + filter_type: List[str] = [], + es_index: str = "fatcat_ref", +) -> int: + """ + Same parameters as get_inbound_refs(), but returns just a count + """ + + if url and not url_surt: + url = surt_ify(url) + + search = Search(using=es_client, index=es_index) + + if release_ident: + search = search.filter("term", target_release_ident=release_ident) + elif work_ident: + search = search.filter("term", target_work_ident=work_ident) + elif openlibrary_work: + search = search.filter("term", target_openlibrary_work=openlibrary_work) + elif url_surt: + search = search.filter("term", target_url_surt=url_surt) + else: + raise ValueError("require a lookup key") + + return search.count() + +def _release_access(release: ReleaseEntity) -> List[AccessOption]: + """ + Extracts access options from a release. + """ + options = [] + for f in (release.files or []): + for u in (f.urls or []): + if '://web.archive.org/' in u.url: + return [AccessOption( + access_type="wayback", + access_url=u.url, + mimetype=f.mimetype, + size_bytes=f.size, + thumbnail_url=None + )] + elif '://archive.org/' in u.url: + return [AccessOption( + access_type="ia_file", + access_url=u.url, + mimetype=f.mimetype, + size_bytes=f.size, + thumbnail_url=None + )] + return options + +# run elasticsearch mget query for all ref idents and include "enriched" refs when possible +# for outbound URL refs, would do wayback CDX fetches to find a direct wayback URL +# TODO: for openlibrary, would this query openlibrary.org API? or some fatcat-specific index? +#enrich_inbound_refs(refs: List[BiblioRef]) -> List[CslBiblioRef] +#enrich_outbound_refs(refs: List[BiblioRef]) -> List[CslBiblioRef] + +# run fatcat API fetches for each ref and return "enriched" refs +def enrich_inbound_refs_fatcat(refs: List[BiblioRef], fatcat_api_client: Any, hide: Optional[str] = "refs", expand: Optional[str] = "container,files,webcaptures,filesets") -> List[FatcatBiblioRef]: + enriched = [] + for ref in refs: + if ref.source_release_ident: + release = fatcat_api_client.get_release(ref.source_release_ident, hide=hide, expand=expand) + enriched.append(FatcatBiblioRef( + ref=ref, + csl=None, + access=_release_access(release), + release=release, + )) + else: + enriched.append(FatcatBiblioRef( + ref=ref, + csl=None, + access=[], + release=None, + )) + return enriched + +def enrich_outbound_refs_fatcat(refs: List[BiblioRef], fatcat_api_client: Any, hide: Optional[str] = "refs", expand: Optional[str] = "container,files,webcaptures,filesets") -> List[FatcatBiblioRef]: + enriched = [] + for ref in refs: + if ref.target_release_ident: + release = fatcat_api_client.get_release(ref.target_release_ident, hide=hide, expand=expand) + enriched.append(FatcatBiblioRef( + ref=ref, + csl=None, + access=_release_access(release), + release=release, + )) + else: + enriched.append(FatcatBiblioRef( + ref=ref, + csl=None, + access=[], + release=None, + )) + return enriched + + +def run_ref_query(args) -> None: + release_ident = None + work_ident = None + if args.ident.startswith("release_"): + release_ident = args.ident.split('_')[1] + elif args.ident.startswith("work_"): + work_ident = args.ident.split('_')[1] + else: + release_ident = args.ident + + print("## Outbound References") + hits = get_outbound_refs(release_ident=release_ident, work_ident=work_ident, es_client=args.es_client) + print(f"Total: {hits.count_total} Time: {hits.query_wall_time_ms}ms; {hits.query_time_ms}ms") + + if args.enrich == "fatcat": + enriched = enrich_outbound_refs_fatcat(hits.result_refs, hide='refs,abstracts', fatcat_api_client=args.fatcat_api_client) + for ref in enriched: + if ref.release: + print(f"{ref.ref.ref_index or '-'}\trelease_{ref.release.ident}\t{ref.ref.match_provenance}/{ref.ref.match_status}\t{ref.release.release_year or '-'}\t{ref.release.title}\t{ref.release.ext_ids.pmid or ref.release.ext_ids.doi or '-'}") + else: + print(f"{ref.ref.ref_index or '-'}\trelease_{ref.target_release_ident}") + else: + for ref in hits.result_refs: + print(f"{ref.ref.ref_index or '-'}\trelease_{ref.target_release_ident}") + + print() + print("## Inbound References") + hits = get_inbound_refs(release_ident=release_ident, work_ident=work_ident, es_client=args.es_client) + + print(f"Total: {hits.count_total} Time: {hits.query_wall_time_ms}ms; {hits.query_time_ms}ms") + + if args.enrich == "fatcat": + enriched = enrich_inbound_refs_fatcat(hits.result_refs, hide='refs,abstracts', fatcat_api_client=args.fatcat_api_client) + for ref in enriched: + if ref.release: + print(f"release_{ref.release.ident}\t{ref.ref.match_provenance}/{ref.ref.match_status}\t{ref.release.release_year or '-'}\t{ref.release.title}\t{ref.release.ext_ids.pmid or ref.release.ext_ids.doi or '-'}") + else: + print(f"release_{ref.target_release_ident}") + else: + for ref in hits.result_refs: + print(f"work_{ref.source_work_ident}\trelease_{ref.source_release_ident}") + +def main() -> None: + """ + Run this utility like: + + python -m fatcat_tools.references + + Examples: + + python -m fatcat_tools.references query release_pfrind3kh5hqhgqkueulk2tply + """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + subparsers = parser.add_subparsers() + + parser.add_argument("--fatcat-api-base", default="https://api.fatcat.wiki/v0") + parser.add_argument("--elasticsearch-base", default="https://search.fatcat.wiki") + parser.add_argument("--elasticsearch-ref-index", default="fatcat_ref") + + sub = subparsers.add_parser( + "query", + help="takes a fatcat ident argument, prints both inbound and outbound references", + ) + sub.set_defaults(func="run_ref_query") + sub.add_argument("ident", type=str) + sub.add_argument("--enrich", type=str) + + args = parser.parse_args() + if not args.__dict__.get("func"): + parser.print_help(file=sys.stderr) + sys.exit(-1) + + args.es_client = elasticsearch.Elasticsearch(args.elasticsearch_base) + args.fatcat_api_client = public_api(args.fatcat_api_base) + + if args.func == "run_ref_query": + run_ref_query(args) + else: + raise NotImplementedError(args.func) + +if __name__ == "__main__": + main() |