aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/mergers/releases.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/mergers/releases.py')
-rw-r--r--python/fatcat_tools/mergers/releases.py249
1 files changed, 249 insertions, 0 deletions
diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py
new file mode 100644
index 00000000..1f995b00
--- /dev/null
+++ b/python/fatcat_tools/mergers/releases.py
@@ -0,0 +1,249 @@
+import argparse
+import os
+import sys
+from typing import Any, Dict, List, Optional
+
+import fatcat_openapi_client
+from fatcat_openapi_client.models import ReleaseEntity, WorkEntity
+
+from fatcat_tools import authenticated_api
+from fatcat_tools.importers import JsonLinePusher
+
+from .common import EntityMerger
+
+
+class ReleaseMerger(EntityMerger):
+ """
+ Hard merges a set of release entities, redirecting all entities to a single
+ primary release. This is different from "grouping" multiple releases under
+ a single work.
+
+ A "primary" (which the other releases will redirect to) can be provided, or
+ one will be chosen from the set of duplicate releases based on the
+ completeness of metadata and other heuristic factors.
+
+ Releases are some of the most complex entities to merge, because of
+ the complexity of bibliographic metadata and the number of related entities
+ which also need to be updated.
+
+ File, Fileset, and Webcapture entities which currently point to a release
+ which gets redirected will be updated to point at the "primary" release.
+
+ Any Work entities which will end up with no releases pointing at them after
+ the merging will get redirected to the work corresponding to the "primary"
+ release.
+
+ NOTE: the "primary" release will currently (as implemented) *not* get
+ updated with metadata from all the redirected releases
+ """
+
+ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
+
+ eg_desc = (
+ kwargs.pop("editgroup_description", None) or "Automated merge of release entities"
+ )
+ eg_extra = kwargs.get("editgroup_extra", dict())
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger")
+ self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False)
+ super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
+ self.entity_type_name = "release"
+
+ def choose_primary_release(
+ self, entities: List[ReleaseEntity], existing_redirects: Dict[str, List[str]]
+ ) -> str:
+ assert entities and len(entities) >= 2
+
+ # want to sort in descending order, so reverse=True
+ entities = sorted(
+ entities,
+ key=lambda a: (
+ # number of entities already redirected to this one
+ len(existing_redirects[a.ident]),
+ # number of file/fileset/webcapture entities (would need to update)
+ int(len(a.files or []) + len(a.filesets or []) + len(a.webcaptures or [])),
+ # has a strong identifier?
+ bool(a.ext_id.doi or a.ext_id.pmid or a.ext_id.arxiv_id),
+ # has any identifier?
+ bool(a.ext_id),
+ # has basic metadata?
+ bool(a.release_type),
+ bool(a.release_status),
+ bool(a.release_year),
+ bool(a.container_id),
+ # has refs, abstracts, extra stuff?
+ bool(a.refs),
+ bool(a.abstracts),
+ ),
+ reverse=True,
+ )
+ return entities[0].ident
+
+ def try_merge(
+ self,
+ dupe_ids: List[str],
+ primary_id: Optional[str] = None,
+ evidence: Optional[Dict[str, Any]] = None,
+ ) -> int:
+
+ # TODO: this code is pretty old and has only been partially refactored.
+ # Needs more testing and review.
+ raise NotImplementedError
+
+ updated_entities = 0
+ releases = dict()
+ existing_redirects: Dict[str, List[str]] = dict()
+ eg_id = self.get_editgroup_id()
+
+ all_ids = dupe_ids.copy()
+ if primary_id:
+ all_ids.append(primary_id)
+ for ident in all_ids:
+ releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures")
+ existing_redirects[ident] = self.api.get_release_redirects(ident)
+
+ if not primary_id:
+ primary_id = self.choose_primary_release(
+ list(releases.values()), existing_redirects
+ )
+ dupe_ids = [d for d in dupe_ids if d != primary_id]
+
+ assert primary_id not in dupe_ids
+
+ primary_work_id = releases[primary_id].work_id
+ updated_work_ids = []
+ redirected_release_ids = []
+
+ # execute all the release redirects
+ for release in releases.values():
+ if release.ident == primary_id:
+ continue
+
+ # file redirects
+ for e in release.files:
+ assert release.ident in e.release_ids
+ e.release_ids.remove(release.ident)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
+ if not self.dry_run_mode:
+ self.api.update_file(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts["updated-files"] += 1
+
+ # fileset redirects
+ for e in release.filesets:
+ assert release.ident in e.release_ids
+ e.release_ids.remove(release.ident)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
+ if not self.dry_run_mode:
+ self.api.update_fileset(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts["updated-filesets"] += 1
+
+ # webcapture redirects
+ for e in release.webcaptures:
+ assert release.ident in e.release_ids
+ e.release_ids.remove(release.ident)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
+ if not self.dry_run_mode:
+ self.api.update_webcapture(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts["updated-webcaptures"] += 1
+
+ # the release redirect itself
+ updated_work_ids.append(release.work_id)
+ redirected_release_ids.append(release.ident)
+ if not self.dry_run_mode:
+ self.api.update_release(
+ eg_id,
+ release.ident,
+ ReleaseEntity(redirect=primary_id, edit_extra=evidence),
+ )
+ updated_entities += 1
+ self.counts["updated-releases"] += 1
+
+ # lastly, clean up any merged work entities
+ redirected_release_ids = list(set(redirected_release_ids))
+ updated_work_ids = list(set(updated_work_ids))
+ assert primary_work_id not in updated_work_ids
+ for work_id in updated_work_ids:
+ work_releases = self.api.get_work_releases(work_id, hide="abstracts,refs")
+ rids = set([r.ident for r in work_releases])
+ if rids.issubset(redirected_release_ids):
+ # all the releases for this work were updated/merged; we should
+ # redirect to primary work_id
+ # also check we don't have any in-flight edit conflicts
+ assert work_id not in self._idents_inflight
+ self._idents_inflight.append(work_id)
+ if not self.dry_run_mode:
+ self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id))
+ updated_entities += 1
+ self.counts["updated-works"] += 1
+
+ return updated_entities
+
+
+def run_merge_releases(args: argparse.Namespace) -> None:
+ em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run)
+ JsonLinePusher(em, args.json_file).run()
+
+
+def main() -> None:
+ """
+ Invoke like:
+
+ python3 -m fatcat_tools.mergers.releases [options]
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--host-url", default="http://localhost:9411/v0", help="connect to this host/port"
+ )
+ parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int)
+ parser.add_argument(
+ "--editgroup-description-override",
+ help="editgroup description override",
+ default=None,
+ type=str,
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="don't actually commit merges, just count what would have been",
+ )
+ parser.set_defaults(
+ auth_var="FATCAT_AUTH_API_TOKEN",
+ )
+ subparsers = parser.add_subparsers()
+
+ sub_merge_releases = subparsers.add_parser("merge-releases")
+ sub_merge_releases.set_defaults(func=run_merge_releases)
+ sub_merge_releases.add_argument(
+ "json_file",
+ help="source of merge lines to process (or stdin)",
+ default=sys.stdin,
+ type=argparse.FileType("r"),
+ )
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ # allow editgroup description override via env variable (but CLI arg takes
+ # precedence)
+ if not args.editgroup_description_override and os.environ.get(
+ "FATCAT_EDITGROUP_DESCRIPTION"
+ ):
+ args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION")
+
+ args.api = authenticated_api(
+ args.host_url,
+ # token is an optional kwarg (can be empty string, None, etc)
+ token=os.environ.get(args.auth_var),
+ )
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()