summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-04-15 17:01:21 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-07-23 10:55:09 -0700
commita5a8811a605080f2cd9eb575c33a17f045c43674 (patch)
treef8d1b36ae178363f0b54a325faee1dbd42f70be8 /python
parent16157db3c47e0663a9cfaa60482204f88126e8f7 (diff)
downloadfatcat-a5a8811a605080f2cd9eb575c33a17f045c43674.tar.gz
fatcat-a5a8811a605080f2cd9eb575c33a17f045c43674.zip
initial inbound/outbound reference query helpers
Diffstat (limited to 'python')
-rw-r--r--python/fatcat_tools/references.py450
1 files changed, 450 insertions, 0 deletions
diff --git a/python/fatcat_tools/references.py b/python/fatcat_tools/references.py
new file mode 100644
index 00000000..c9730174
--- /dev/null
+++ b/python/fatcat_tools/references.py
@@ -0,0 +1,450 @@
+"""
+Helper routines for working with the fatcat citation graph, which is a separate
+index of reference links between works in the main catalog.
+
+See bulk citation and citation API proposals for design documentation.
+
+TODO:
+
+ surt_ify() helper (URL to SURT for queries)
+ CSL enrichment method (using only elasticsearch mget)
+ CSL enrichment for fatcat enrichment
+ access transform
+ microfilm access in access transform
+
+ all_outbound_refs(...) -> List[BiblioRef]
+ all_inbound_refs(...) -> List[BiblioRef]
+ same as get_outbound_refs()/get_inbound_refs(), but does a scroll (return list or iterator?)
+ (optional; maybe not public)
+"""
+
+import sys
+import json
+import datetime
+import argparse
+from enum import Enum
+from typing import Optional, List, Any, Dict
+
+from pydantic import BaseModel
+import elasticsearch
+from elasticsearch_dsl import Search, Q
+from fatcat_openapi_client import ReleaseEntity
+
+from fatcat_tools import public_api
+
+
+
+class BiblioRef(BaseModel):
+ """bibliographic reference"""
+ # ("release", source_release_ident, ref_index)
+ # ("wikipedia", source_wikipedia_article, ref_index)
+ _key: Optional[str]
+ update_ts: Optional[datetime.datetime]
+
+ # metadata about source of reference
+ source_release_ident: Optional[str]
+ source_work_ident: Optional[str]
+ # with lang prefix like "en:Superglue"
+ source_wikipedia_article: Optional[str]
+ # skipped: source_openlibrary_work
+ # skipped: source_url_surt
+ source_release_stage: Optional[str]
+ source_year: Optional[int]
+
+ # context of the reference itself
+ # 1-indexed, not 0-indexed
+ ref_index: Optional[int] # TODO: actually optional?
+ # eg, "Lee86", "BIB23"
+ ref_key: Optional[str]
+ # eg, page number
+ ref_locator: Optional[str]
+
+ # target of reference (identifiers)
+ target_release_ident: Optional[str]
+ target_work_ident: Optional[str]
+ target_openlibrary_work: Optional[str]
+ target_url_surt: Optional[str]
+ # would not be stored in elasticsearch, but would be auto-generated by all "get" methods from the SURT, so calling code does not need to do SURT transform
+ target_url: Optional[str]
+ # skipped: target_wikipedia_article
+
+ # crossref, pubmed, grobid, etc
+ match_provenance: str
+ # strong, weak, etc
+ match_status: Optional[str]
+ # TODO: "match_strength"?
+ # "doi", "isbn", "fuzzy title, author", etc
+ # maybe "fuzzy-title-author"?
+ match_reason: Optional[str]
+
+ # only if no release_ident link/match
+ target_unstructured: Optional[str]
+ target_csl: Optional[Dict[str, Any]]
+
+class AccessType(str, Enum):
+ """describes type of access URL"""
+
+ wayback = "wayback"
+ ia_file = "ia_file"
+ ia_microfilm = "ia_microfilm"
+ repository = "repository"
+
+class AccessOption(BaseModel):
+
+ access_type: AccessType
+
+ # note: for `target_url` refs, would do a CDX lookup and this URL would be
+ # a valid/HTTP-200 web.archive.org capture URL
+ access_url: str
+
+ # application/pdf, text/html, etc
+ # blank for landing pages
+ mimetype: Optional[str]
+
+ size_bytes: Optional[int]
+ thumbnail_url: Optional[str]
+
+class CslBiblioRef(BaseModel):
+ # an "enriched" version of BiblioRef with metadata about the source or
+ # target entity. would be "hydrated" via a lookup to, eg, the
+ # `fatcat_release` elasticsearch index (fast mget fetch with a single
+ # request), as opposed to fatcat API fetches
+ ref: BiblioRef
+ csl: Optional[Dict[str, Any]]
+ access: List[AccessOption]
+
+ class Config:
+ arbitrary_types_allowed = True
+
+class FatcatBiblioRef(BaseModel):
+ # enriched version of BiblioRef with complete ReleaseEntity object as
+ # fetched from the fatcat API. CSL-JSON metadata would be derived from
+ # the full release entity.
+ ref: BiblioRef
+ release: Optional[ReleaseEntity]
+ csl: Optional[Dict[str, Any]]
+ access: List[AccessOption]
+
+ class Config:
+ arbitrary_types_allowed = True
+
+class RefHits(BaseModel):
+ count_returned: int
+ count_total: int
+ offset: int
+ limit: int
+ query_time_ms: int
+ query_wall_time_ms: int
+ result_refs: List[BiblioRef]
+
+def _execute_ref_query(search: Any, limit: int, offset: Optional[int] = None) -> List[BiblioRef]:
+
+ limit = min((int(limit or 15), 200))
+ if not offset or offset < 0:
+ offset = 0
+
+ search = search.params(track_total_hits=True)
+ search = search[offset : (offset + limit)]
+
+ query_start = datetime.datetime.now()
+ try:
+ resp = search.execute()
+ except elasticsearch.exceptions.RequestError as e_raw:
+ # this is a "user" error
+ e: Any = e_raw
+ #logging.warn("elasticsearch 400: " + str(e.info))
+ if e.info.get("error", {}).get("root_cause", {}):
+ raise ValueError(str(e.info["error"]["root_cause"][0].get("reason"))) from e
+ else:
+ raise ValueError(str(e.info)) from e
+ except elasticsearch.exceptions.TransportError as e:
+ # all other errors
+ #logging.warn(f"elasticsearch non-200 status code: {e.info}")
+ raise IOError(str(e.info)) from e
+ query_delta = datetime.datetime.now() - query_start
+
+ result_refs = []
+ for h in resp.hits:
+ # might be a list because of consolidation
+ if isinstance(h._d_.get('source_work_ident'), list):
+ h._d_['source_work_ident'] = h._d_['source_work_ident'][0]
+ result_refs.append(BiblioRef.parse_obj(h._d_))
+
+ return RefHits(
+ count_returned=len(result_refs),
+ # ES 7.x style "total"
+ count_total=resp.hits.total.value,
+ offset=offset,
+ limit=limit,
+ query_time_ms=int(resp.took),
+ query_wall_time_ms=int(query_delta.total_seconds() * 1000),
+ result_refs=result_refs,
+ )
+
+
+def get_outbound_refs(
+ es_client: Any,
+ release_ident: Optional[str] = None,
+ work_ident: Optional[str] = None,
+ wikipedia_article: Optional[str] = None,
+ limit: int = 100,
+ offset: Optional[int] = None,
+ es_index: str = "fatcat_ref",
+) -> List[BiblioRef]:
+
+ search = Search(using=es_client, index=es_index)
+
+ if release_ident:
+ search = search.filter("term", source_release_ident=release_ident)
+ elif work_ident:
+ search = search.filter("term", source_work_ident=work_ident)
+ elif wikipedia_article:
+ search = search.filter("term", source_wikipedia_article=wikipedia_article)
+ else:
+ raise ValueError("require a lookup key")
+
+ # TODO: schema doesn't support either of these currently
+ #search = search.sort("ref_index")
+ #search = search.sort("ref_key")
+
+ # re-sort by index
+ hits = _execute_ref_query(search, limit=limit, offset=offset)
+ hits.result_refs = sorted(hits.result_refs, key=lambda r: r.ref_index or 0)
+ return hits
+
+def get_inbound_refs(
+ es_client: Any,
+ release_ident: Optional[str] = None,
+ work_ident: Optional[str] = None,
+ openlibrary_work: Optional[str] = None,
+ url_surt: Optional[str] = None,
+ url: Optional[str] = None,
+ consolidate_works: bool = True,
+ filter_stage: List[str] = [],
+ filter_type: List[str] = [],
+ limit: int = 25,
+ offset: Optional[int] = None,
+ es_index: str = "fatcat_ref",
+) -> List[BiblioRef]:
+ # TODO: filter_stage, filter_type
+
+ if url and not url_surt:
+ url = surt_ify(url)
+
+ search = Search(using=es_client, index=es_index)
+
+ if consolidate_works:
+ search = search.extra(
+ collapse={
+ "field": "source_work_ident",
+ "inner_hits": {"name": "source_more", "size": 0,},
+ }
+ )
+
+ if release_ident:
+ search = search.filter("term", target_release_ident=release_ident)
+ elif work_ident:
+ search = search.filter("term", target_work_ident=work_ident)
+ elif openlibrary_work:
+ search = search.filter("term", target_openlibrary_work=openlibrary_work)
+ elif url_surt:
+ search = search.filter("term", target_url_surt=url_surt)
+ else:
+ raise ValueError("require a lookup key")
+
+ # TODO: wrong type, not int? and maybe need to index differently?
+ #search = search.sort("source_year")
+
+ return _execute_ref_query(search, limit=limit, offset=offset)
+
+def count_inbound_refs(
+ es_client: Any,
+ release_ident: Optional[str] = None,
+ work_ident: Optional[str] = None,
+ openlibrary_work: Optional[str] = None,
+ url_surt: Optional[str] = None,
+ url: Optional[str] = None,
+ filter_stage: List[str] = [],
+ filter_type: List[str] = [],
+ es_index: str = "fatcat_ref",
+) -> int:
+ """
+ Same parameters as get_inbound_refs(), but returns just a count
+ """
+
+ if url and not url_surt:
+ url = surt_ify(url)
+
+ search = Search(using=es_client, index=es_index)
+
+ if release_ident:
+ search = search.filter("term", target_release_ident=release_ident)
+ elif work_ident:
+ search = search.filter("term", target_work_ident=work_ident)
+ elif openlibrary_work:
+ search = search.filter("term", target_openlibrary_work=openlibrary_work)
+ elif url_surt:
+ search = search.filter("term", target_url_surt=url_surt)
+ else:
+ raise ValueError("require a lookup key")
+
+ return search.count()
+
+def _release_access(release: ReleaseEntity) -> List[AccessOption]:
+ """
+ Extracts access options from a release.
+ """
+ options = []
+ for f in (release.files or []):
+ for u in (f.urls or []):
+ if '://web.archive.org/' in u.url:
+ return [AccessOption(
+ access_type="wayback",
+ access_url=u.url,
+ mimetype=f.mimetype,
+ size_bytes=f.size,
+ thumbnail_url=None
+ )]
+ elif '://archive.org/' in u.url:
+ return [AccessOption(
+ access_type="ia_file",
+ access_url=u.url,
+ mimetype=f.mimetype,
+ size_bytes=f.size,
+ thumbnail_url=None
+ )]
+ return options
+
+# run elasticsearch mget query for all ref idents and include "enriched" refs when possible
+# for outbound URL refs, would do wayback CDX fetches to find a direct wayback URL
+# TODO: for openlibrary, would this query openlibrary.org API? or some fatcat-specific index?
+#enrich_inbound_refs(refs: List[BiblioRef]) -> List[CslBiblioRef]
+#enrich_outbound_refs(refs: List[BiblioRef]) -> List[CslBiblioRef]
+
+# run fatcat API fetches for each ref and return "enriched" refs
+def enrich_inbound_refs_fatcat(refs: List[BiblioRef], fatcat_api_client: Any, hide: Optional[str] = "refs", expand: Optional[str] = "container,files,webcaptures,filesets") -> List[FatcatBiblioRef]:
+ enriched = []
+ for ref in refs:
+ if ref.source_release_ident:
+ release = fatcat_api_client.get_release(ref.source_release_ident, hide=hide, expand=expand)
+ enriched.append(FatcatBiblioRef(
+ ref=ref,
+ csl=None,
+ access=_release_access(release),
+ release=release,
+ ))
+ else:
+ enriched.append(FatcatBiblioRef(
+ ref=ref,
+ csl=None,
+ access=[],
+ release=None,
+ ))
+ return enriched
+
+def enrich_outbound_refs_fatcat(refs: List[BiblioRef], fatcat_api_client: Any, hide: Optional[str] = "refs", expand: Optional[str] = "container,files,webcaptures,filesets") -> List[FatcatBiblioRef]:
+ enriched = []
+ for ref in refs:
+ if ref.target_release_ident:
+ release = fatcat_api_client.get_release(ref.target_release_ident, hide=hide, expand=expand)
+ enriched.append(FatcatBiblioRef(
+ ref=ref,
+ csl=None,
+ access=_release_access(release),
+ release=release,
+ ))
+ else:
+ enriched.append(FatcatBiblioRef(
+ ref=ref,
+ csl=None,
+ access=[],
+ release=None,
+ ))
+ return enriched
+
+
+def run_ref_query(args) -> None:
+ release_ident = None
+ work_ident = None
+ if args.ident.startswith("release_"):
+ release_ident = args.ident.split('_')[1]
+ elif args.ident.startswith("work_"):
+ work_ident = args.ident.split('_')[1]
+ else:
+ release_ident = args.ident
+
+ print("## Outbound References")
+ hits = get_outbound_refs(release_ident=release_ident, work_ident=work_ident, es_client=args.es_client)
+ print(f"Total: {hits.count_total} Time: {hits.query_wall_time_ms}ms; {hits.query_time_ms}ms")
+
+ if args.enrich == "fatcat":
+ enriched = enrich_outbound_refs_fatcat(hits.result_refs, hide='refs,abstracts', fatcat_api_client=args.fatcat_api_client)
+ for ref in enriched:
+ if ref.release:
+ print(f"{ref.ref.ref_index or '-'}\trelease_{ref.release.ident}\t{ref.ref.match_provenance}/{ref.ref.match_status}\t{ref.release.release_year or '-'}\t{ref.release.title}\t{ref.release.ext_ids.pmid or ref.release.ext_ids.doi or '-'}")
+ else:
+ print(f"{ref.ref.ref_index or '-'}\trelease_{ref.target_release_ident}")
+ else:
+ for ref in hits.result_refs:
+ print(f"{ref.ref.ref_index or '-'}\trelease_{ref.target_release_ident}")
+
+ print()
+ print("## Inbound References")
+ hits = get_inbound_refs(release_ident=release_ident, work_ident=work_ident, es_client=args.es_client)
+
+ print(f"Total: {hits.count_total} Time: {hits.query_wall_time_ms}ms; {hits.query_time_ms}ms")
+
+ if args.enrich == "fatcat":
+ enriched = enrich_inbound_refs_fatcat(hits.result_refs, hide='refs,abstracts', fatcat_api_client=args.fatcat_api_client)
+ for ref in enriched:
+ if ref.release:
+ print(f"release_{ref.release.ident}\t{ref.ref.match_provenance}/{ref.ref.match_status}\t{ref.release.release_year or '-'}\t{ref.release.title}\t{ref.release.ext_ids.pmid or ref.release.ext_ids.doi or '-'}")
+ else:
+ print(f"release_{ref.target_release_ident}")
+ else:
+ for ref in hits.result_refs:
+ print(f"work_{ref.source_work_ident}\trelease_{ref.source_release_ident}")
+
+def main() -> None:
+ """
+ Run this utility like:
+
+ python -m fatcat_tools.references
+
+ Examples:
+
+ python -m fatcat_tools.references query release_pfrind3kh5hqhgqkueulk2tply
+ """
+
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ )
+ subparsers = parser.add_subparsers()
+
+ parser.add_argument("--fatcat-api-base", default="https://api.fatcat.wiki/v0")
+ parser.add_argument("--elasticsearch-base", default="https://search.fatcat.wiki")
+ parser.add_argument("--elasticsearch-ref-index", default="fatcat_ref")
+
+ sub = subparsers.add_parser(
+ "query",
+ help="takes a fatcat ident argument, prints both inbound and outbound references",
+ )
+ sub.set_defaults(func="run_ref_query")
+ sub.add_argument("ident", type=str)
+ sub.add_argument("--enrich", type=str)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ parser.print_help(file=sys.stderr)
+ sys.exit(-1)
+
+ args.es_client = elasticsearch.Elasticsearch(args.elasticsearch_base)
+ args.fatcat_api_client = public_api(args.fatcat_api_base)
+
+ if args.func == "run_ref_query":
+ run_ref_query(args)
+ else:
+ raise NotImplementedError(args.func)
+
+if __name__ == "__main__":
+ main()