From 8b132461c9e9f85e61c520f2f576144a6f6e06ac Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 5 Sep 2019 18:41:30 -0700 Subject: first iteration of mergers --- python/fatcat_merge.py | 112 +++++++++++++++++++++++++++ python/fatcat_tools/mergers/__init__.py | 3 + python/fatcat_tools/mergers/common.py | 130 ++++++++++++++++++++++++++++++++ python/fatcat_tools/mergers/releases.py | 110 +++++++++++++++++++++++++++ 4 files changed, 355 insertions(+) create mode 100755 python/fatcat_merge.py create mode 100644 python/fatcat_tools/mergers/__init__.py create mode 100644 python/fatcat_tools/mergers/common.py create mode 100644 python/fatcat_tools/mergers/releases.py (limited to 'python') diff --git a/python/fatcat_merge.py b/python/fatcat_merge.py new file mode 100755 index 00000000..7b0ae63b --- /dev/null +++ b/python/fatcat_merge.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 + +""" +Tools for merging entities in various ways. + + group-releases: pull all release entities under a single work + => merges work entities + merge-releases: merge release entities together + => groups files/filesets/webcaptures + merge-containers: merge container entities + merge-files: merge file entities + +Input format is usually JSON lines with keys: + + idents (required): array of string identifiers + primary (optional): single string identifier + +""" + +import os, sys, argparse +from fatcat_tools import authenticated_api +from fatcat_tools.mergers import * +from fatcat_tools.importers import JsonLinePusher + + +def run_group_releases(args): + rg = ReleaseGrouper(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(rg, args.json_file).run() + +def run_merge_releases(args): + rm = ReleaseMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(rg, args.json_file).run() + +def run_merge_containers(args): + cm = ReleaseMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(cm, args.json_file).run() + +def run_merge_files(args): + fm = FileMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(fm, args.json_file).run() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--batch-size', + help="size of batch to send", + default=50, type=int) + parser.add_argument('--editgroup-description-override', + help="editgroup description override", + default=None, type=str) + parser.add_argument('--dry-run', + action='store_true', + help="don't actually commit merges, just count what would have been") + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_group_releases = subparsers.add_parser('group-releases') + sub_group_releases.set_defaults(func=run_group_releases) + sub_group_releases.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_releases = subparsers.add_parser('merge-releases') + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_files = subparsers.add_parser('merge-files') + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_containers = subparsers.add_parser('merge-containers') + sub_merge_containers.set_defaults(func=run_merge_containers) + sub_merge_containers.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override \ + and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): + args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var)) + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py new file mode 100644 index 00000000..c38a397d --- /dev/null +++ b/python/fatcat_tools/mergers/__init__.py @@ -0,0 +1,3 @@ + +from .common import EntityMerger +from .releases import ReleaseMerger, ReleaseGrouper diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..62a29c42 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,130 @@ + +""" +Tools for merging entities in various ways. + + group-releases: pull all release entities under a single work + => merges work entities + merge-releases: merge release entities together + => groups files/filesets/webcaptures + merge-containers: merge container entities + merge-files: merge file entities + +Input format is JSON lines with keys: + + idents (required): array of string identifiers + primary (optional): single string identifier + +""" + +import subprocess +from collections import Counter + +import fatcat_api_client +from fatcat_api_client.rest import ApiException + + +class EntityMerger: + """ + API for individual jobs: + + # record iterators sees + push_record(raw_record) + finish() + + # provided helpers + self.api + self.get_editgroup_id() + counts({'lines', 'skip', 'merged', 'updated'}) + + # implemented per-task + try_merge(idents, primary=None) -> int (entities updated) + + This class is pretty similar to EntityImporter, but isn't subclassed. + """ + + def __init__(self, api, **kwargs): + + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['git_rev'] = eg_extra.get('git_rev', + subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + + self.api = api + self.dry_run_mode = kwargs.get('dry_run_mode', True) + self.edit_batch_size = kwargs.get('edit_batch_size', 50) + self.editgroup_description = kwargs.get('editgroup_description') + self.editgroup_extra = eg_extra + self.reset() + + if self.dry_run_mode: + print("Running in dry-run mode!") + + def reset(self): + self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + self._idents_inflight = [] + + def push_record(self, line): + """ + Intended to be called by "pusher" class (which could be pulling from + JSON file, Kafka, whatever). + + Input is expected to be a dict-like object with key "idents", and + optionally "primary". + + Returns nothing. + """ + self.counts['lines'] += 1 + if (not raw_record): + self.counts['skip'] += 1 + return + primary = line.get('primary') + idents = list(set(line.get('idents'))) + if primary and primary not in idents: + idents.append(primary) + if not idents or len(idents) <= 1: + self.counts['skip'] += 1 + return + for i in idents: + if i in self._idents_inflight: + raise ValueError("Entity already part of in-process merge operation: {}".format(i)) + self._idents.inflight.append(i) + count = self.try_merge(idents, primary=primary) + if count: + self.counts['merged'] += 1 + self.counts['updated-total'] += count + self._edit_count += count + else: + self.counts['skip'] += 1 + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + return + + def try_merge(self, idents, primary=None): + # implementations should fill this in + raise NotImplementedError + + def finish(self): + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + + return self.counts + + def get_editgroup_id(self): + + if not self._editgroup_id: + eg = self.api.create_editgroup( + fatcat_api_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + self._editgroup_id = eg.editgroup_id + + return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py new file mode 100644 index 00000000..802cb8da --- /dev/null +++ b/python/fatcat_tools/mergers/releases.py @@ -0,0 +1,110 @@ + +from .common import EntityMerger +from fatcat_api_client.models import ReleaseEntity, WorkEntity + + +class ReleaseMerger(EntityMerger): + """ + Hard merges a set of release entities, redirecting all entities to a single + primary release. + + Will also redirect works (if appropriate), and re-point {files, filesets, + webcaptures} to the new merged release. + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.get('editgroup_description', + "Automated merge of release entities") + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def try_merge(self, idents, primary=None): + + updated_entities = 0 + releases = dict() + eg_id = self.get_editgroup_id() + + self.dry_run_mode + + for ident in idents: + releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + + # select the primary (if not already set) + if not primary: + primary = releases.keys()[0] + + primary_work_id = releases[primary].work_id + updated_work_ids = [] + redirected_release_ids = [] + + # execute all the release redirects + for release in releases.values(): + if release.ident == primary: + continue + + # file redirects + for e in release.files: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_file(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-files'] += 1 + + # fileset redirects + for e in release.filesets: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_fileset(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-filesets'] += 1 + + # webcapture redirects + for e in release.webcaptures: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_webcapture(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-webcaptures'] += 1 + + # release redirect itself + updated_work_ids.append(release.work_id) + redirected_release_ids.append(release.ident) + if not self.dry_run_mode: + api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) + updated_entities += 1 + self.counts['updated-releases'] + + + # lastly, clean up any merged work entities + redirected_release_ids = set(redirected_release_ids) + updated_work_ids = list(set(updated_work_ids)) + assert primary_work_id not in updated_work_ids + for work_id in updated_work_ids: + work_releases = api.get_work_releases(work_id) + rids = set([r.ident for r in work_releases]) + if rids.issubset(redirected_release_ids): + # all the releases for this work were updated/merged; we should + # redirect to primary work_id + # also check we don't have any in-flight edit conflicts + assert not work_id in self._idents_inflight + self._idents_inflight.append(work_id) + if not self.dry_run_mode: + api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + updated_entities += 1 + self.counts['updated-works'] += 1 + + return updated_entities + +class ReleaseGrouper(EntityMerger): + pass -- cgit v1.2.3 From a00f4c63e7db5b021ded7c6caf6f1b889627568a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 16 Nov 2021 19:27:33 -0800 Subject: remove top-level fatcat_merge.py; going to call module __main__ going forward --- python/fatcat_merge.py | 112 ------------------------------------------------- 1 file changed, 112 deletions(-) delete mode 100755 python/fatcat_merge.py (limited to 'python') diff --git a/python/fatcat_merge.py b/python/fatcat_merge.py deleted file mode 100755 index 7b0ae63b..00000000 --- a/python/fatcat_merge.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 - -""" -Tools for merging entities in various ways. - - group-releases: pull all release entities under a single work - => merges work entities - merge-releases: merge release entities together - => groups files/filesets/webcaptures - merge-containers: merge container entities - merge-files: merge file entities - -Input format is usually JSON lines with keys: - - idents (required): array of string identifiers - primary (optional): single string identifier - -""" - -import os, sys, argparse -from fatcat_tools import authenticated_api -from fatcat_tools.mergers import * -from fatcat_tools.importers import JsonLinePusher - - -def run_group_releases(args): - rg = ReleaseGrouper(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(rg, args.json_file).run() - -def run_merge_releases(args): - rm = ReleaseMerger(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(rg, args.json_file).run() - -def run_merge_containers(args): - cm = ReleaseMerger(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(cm, args.json_file).run() - -def run_merge_files(args): - fm = FileMerger(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(fm, args.json_file).run() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--host-url', - default="http://localhost:9411/v0", - help="connect to this host/port") - parser.add_argument('--batch-size', - help="size of batch to send", - default=50, type=int) - parser.add_argument('--editgroup-description-override', - help="editgroup description override", - default=None, type=str) - parser.add_argument('--dry-run', - action='store_true', - help="don't actually commit merges, just count what would have been") - parser.set_defaults( - auth_var="FATCAT_AUTH_API_TOKEN", - ) - subparsers = parser.add_subparsers() - - sub_group_releases = subparsers.add_parser('group-releases') - sub_group_releases.set_defaults(func=run_group_releases) - sub_group_releases.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_merge_releases = subparsers.add_parser('merge-releases') - sub_merge_releases.set_defaults(func=run_merge_releases) - sub_merge_releases.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_merge_files = subparsers.add_parser('merge-files') - sub_merge_files.set_defaults(func=run_merge_files) - sub_merge_files.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_merge_containers = subparsers.add_parser('merge-containers') - sub_merge_containers.set_defaults(func=run_merge_containers) - sub_merge_containers.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - args = parser.parse_args() - if not args.__dict__.get("func"): - print("tell me what to do!") - sys.exit(-1) - - # allow editgroup description override via env variable (but CLI arg takes - # precedence) - if not args.editgroup_description_override \ - and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): - args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') - - args.api = authenticated_api( - args.host_url, - # token is an optional kwarg (can be empty string, None, etc) - token=os.environ.get(args.auth_var)) - args.func(args) - -if __name__ == '__main__': - main() -- cgit v1.2.3 From 717e4d71620093e16bac3ae8932c482ac8b12efa Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Nov 2021 12:28:19 -0800 Subject: mergers: fmt, lint, refactors These old merger code is from an old branch and needed significant cleanup --- python/fatcat_tools/mergers/__init__.py | 4 +- python/fatcat_tools/mergers/common.py | 129 +++++++++++++++---------- python/fatcat_tools/mergers/releases.py | 163 ++++++++++++++++++++++++-------- 3 files changed, 200 insertions(+), 96 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py index c38a397d..ccc4c02c 100644 --- a/python/fatcat_tools/mergers/__init__.py +++ b/python/fatcat_tools/mergers/__init__.py @@ -1,3 +1,3 @@ - from .common import EntityMerger -from .releases import ReleaseMerger, ReleaseGrouper +from .files import FileMerger +from .releases import ReleaseMerger diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 62a29c42..1aaf357d 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -1,29 +1,25 @@ - """ Tools for merging entities in various ways. - group-releases: pull all release entities under a single work - => merges work entities merge-releases: merge release entities together => groups files/filesets/webcaptures + => merges work entities if needed + merge-works: pull all release entities under a single work + => merges work entities merge-containers: merge container entities merge-files: merge file entities - -Input format is JSON lines with keys: - - idents (required): array of string identifiers - primary (optional): single string identifier - """ import subprocess from collections import Counter +from typing import Any, Dict, List, Optional -import fatcat_api_client -from fatcat_api_client.rest import ApiException +import fatcat_openapi_client +from fatcat_tools.importers import EntityImporter -class EntityMerger: + +class EntityMerger(EntityImporter): """ API for individual jobs: @@ -37,67 +33,85 @@ class EntityMerger: counts({'lines', 'skip', 'merged', 'updated'}) # implemented per-task - try_merge(idents, primary=None) -> int (entities updated) + try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None This class is pretty similar to EntityImporter, but isn't subclassed. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger") self.api = api - self.dry_run_mode = kwargs.get('dry_run_mode', True) - self.edit_batch_size = kwargs.get('edit_batch_size', 50) - self.editgroup_description = kwargs.get('editgroup_description') + self.dry_run_mode = kwargs.get("dry_run_mode", True) + self.edit_batch_size = kwargs.get("edit_batch_size", 50) + self.editgroup_description = kwargs.get("editgroup_description") self.editgroup_extra = eg_extra self.reset() + self.entity_type_name = "common" if self.dry_run_mode: print("Running in dry-run mode!") - def reset(self): - self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + def reset(self) -> None: + self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0}) self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] - self._idents_inflight = [] + self._editgroup_id: Optional[str] = None + self._idents_inflight: List[str] = [] - def push_record(self, line): + def push_record(self, record: Dict[str, Any]) -> None: """ Intended to be called by "pusher" class (which could be pulling from JSON file, Kafka, whatever). - Input is expected to be a dict-like object with key "idents", and - optionally "primary". + Input is expected to be a dict-like object with keys: + + entity_type: str + primary_id: Optional[str] + duplicate_ids: [str] (order not preserved) + evidence: Optional[dict] + # can be anything, entity- or merger-specific + # some variables might be... + dupe_extid: str + dupe_extid_type: str Returns nothing. """ - self.counts['lines'] += 1 - if (not raw_record): - self.counts['skip'] += 1 + self.counts["lines"] += 1 + if not record: + self.counts["skip-blank-line"] += 1 + return + if record.get("entity_type") != self.entity_type_name: + self.counts["skip-entity-type"] += 1 return - primary = line.get('primary') - idents = list(set(line.get('idents'))) - if primary and primary not in idents: - idents.append(primary) - if not idents or len(idents) <= 1: - self.counts['skip'] += 1 + primary_id: Optional[str] = record.get("primary_id") + duplicate_ids: List[str] = list(set(record["duplicate_ids"])) + duplicate_ids = [di for di in duplicate_ids if di != primary_id] + if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id): + self.counts["skip-no-dupes"] += 1 return - for i in idents: + all_ids = duplicate_ids + if primary_id: + all_ids.append(primary_id) + for i in all_ids: if i in self._idents_inflight: - raise ValueError("Entity already part of in-process merge operation: {}".format(i)) - self._idents.inflight.append(i) - count = self.try_merge(idents, primary=primary) + raise ValueError( + "Entity already part of in-process merge operation: {}".format(i) + ) + self._idents_inflight.append(i) + count = self.try_merge( + duplicate_ids, primary_id=primary_id, evidence=record.get("evidence") + ) if count: - self.counts['merged'] += 1 - self.counts['updated-total'] += count + self.counts["merged"] += 1 + self.counts["updated-entities"] += count self._edit_count += count else: - self.counts['skip'] += 1 + self.counts["skip"] += 1 if self._edit_count >= self.edit_batch_size: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -105,11 +119,16 @@ class EntityMerger: self._idents_inflight = [] return - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: # implementations should fill this in raise NotImplementedError - def finish(self): + def finish(self) -> Counter: if self._edit_count > 0: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -118,13 +137,19 @@ class EntityMerger: return self.counts - def get_editgroup_id(self): + def get_editgroup_id(self, _edits: int = 1) -> str: + """ + This version of get_editgroup_id() is similar to the EntityImporter + version, but does not update self._edit_count or submit editgroups. The + edits parameter is ignored. + """ if not self._editgroup_id: eg = self.api.create_editgroup( - fatcat_api_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra)) + fatcat_openapi_client.Editgroup( + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) self._editgroup_id = eg.editgroup_id - + assert self._editgroup_id return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py index 802cb8da..fc970057 100644 --- a/python/fatcat_tools/mergers/releases.py +++ b/python/fatcat_tools/mergers/releases.py @@ -1,6 +1,15 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import ReleaseEntity, WorkEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher from .common import EntityMerger -from fatcat_api_client.models import ReleaseEntity, WorkEntity class ReleaseMerger(EntityMerger): @@ -12,99 +21,169 @@ class ReleaseMerger(EntityMerger): webcaptures} to the new merged release. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get('editgroup_description', - "Automated merge of release entities") - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') - super().__init__(api, - editgroup_description=eg_desc, - editgroup_extra=eg_extra, - **kwargs) + eg_desc = kwargs.get("editgroup_description", "Automated merge of release entities") + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger") + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "release" - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + """ + XXX: review/refactor; this code is very old + """ updated_entities = 0 releases = dict() eg_id = self.get_editgroup_id() - self.dry_run_mode - - for ident in idents: - releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures") - # select the primary (if not already set) - if not primary: - primary = releases.keys()[0] + if not primary_id: + # XXX: + primary_id = dupe_ids[0] + dupe_ids = [d for d in dupe_ids if d != primary_id] - primary_work_id = releases[primary].work_id + primary_work_id = releases[primary_id].work_id updated_work_ids = [] redirected_release_ids = [] # execute all the release redirects for release in releases.values(): - if release.ident == primary: + if release.ident == primary_id: continue # file redirects for e in release.files: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_file(eg_id, e.ident, e) + self.api.update_file(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-files'] += 1 + self.counts["updated-files"] += 1 # fileset redirects for e in release.filesets: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_fileset(eg_id, e.ident, e) + self.api.update_fileset(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-filesets'] += 1 + self.counts["updated-filesets"] += 1 # webcapture redirects for e in release.webcaptures: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_webcapture(eg_id, e.ident, e) + self.api.update_webcapture(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-webcaptures'] += 1 + self.counts["updated-webcaptures"] += 1 # release redirect itself updated_work_ids.append(release.work_id) redirected_release_ids.append(release.ident) if not self.dry_run_mode: - api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) + self.api.update_release( + eg_id, release.ident, ReleaseEntity(redirect=primary_id) + ) updated_entities += 1 - self.counts['updated-releases'] - + self.counts["updated-releases"] += 1 # lastly, clean up any merged work entities - redirected_release_ids = set(redirected_release_ids) + redirected_release_ids = list(set(redirected_release_ids)) updated_work_ids = list(set(updated_work_ids)) assert primary_work_id not in updated_work_ids for work_id in updated_work_ids: - work_releases = api.get_work_releases(work_id) + work_releases = self.api.get_work_releases(work_id) rids = set([r.ident for r in work_releases]) if rids.issubset(redirected_release_ids): # all the releases for this work were updated/merged; we should # redirect to primary work_id # also check we don't have any in-flight edit conflicts - assert not work_id in self._idents_inflight + assert work_id not in self._idents_inflight self._idents_inflight.append(work_id) if not self.dry_run_mode: - api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) updated_entities += 1 - self.counts['updated-works'] += 1 + self.counts["updated-works"] += 1 return updated_entities -class ReleaseGrouper(EntityMerger): - pass + +def run_merge_releases(args: argparse.Namespace) -> None: + em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.releases [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_releases = subparsers.add_parser("merge-releases") + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() -- cgit v1.2.3 From f12bde00c34abf1d4a1604a76cac033b3c4c864b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Nov 2021 12:36:00 -0800 Subject: initial file merger, with tests --- python/fatcat_tools/mergers/files.py | 228 +++++++++++++++++++++++++++++++++++ python/tests/merge_files.py | 160 ++++++++++++++++++++++++ 2 files changed, 388 insertions(+) create mode 100644 python/fatcat_tools/mergers/files.py create mode 100644 python/tests/merge_files.py (limited to 'python') diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py new file mode 100644 index 00000000..4bc8bb81 --- /dev/null +++ b/python/fatcat_tools/mergers/files.py @@ -0,0 +1,228 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import FileEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher + +from .common import EntityMerger + + +class FileMerger(EntityMerger): + """ + Combines file entities into a single primary. Merges any existing partial + metadata (such as release_ids and URLs). Can choose a primary if necessary. + + The primary is only updated if needed. + + TODO: relies on API server to detect "redirect of redirect" situation + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_desc = kwargs.get("editgroup_description", "Automated merge of file entities") + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger") + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "file" + + def choose_primary_file(self, entities: List[FileEntity]) -> str: + """ + TODO: could incorporate number of redirected entities already pointing at an entity + """ + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # has complete metadata? + bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)), + # has releases associated? + bool(a.release_ids), + # has URLs? + bool(a.urls), + # has extra metadata? + bool(a.extra), + # number of release_ids + len(a.release_ids or []), + ), + reverse=True, + ) + return entities[0].ident + + def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool: + """ + Compares a primary to an other. If there are helpful metadata fields in + the other, copy them to primary, in-place. + + This is intended to extract any useful metadata from "other" before it + gets redirected to "primary". + + Returns True if the primary was updated, False otherwise. + """ + updated = False + # NOTE: intentionally not including sha1 here + for k in ["size", "mimetype", "sha256", "md5"]: + if not getattr(primary, k) and getattr(other, k): + setattr(primary, k, getattr(other, k)) + updated = True + + if not primary.urls: + primary.urls = [] + if not primary.release_ids: + primary.release_ids = [] + + if other.extra: + if not primary.extra: + primary.extra = other.extra + updated = True + else: + for k in other.extra.keys(): + if k not in primary.extra: + primary.extra[k] = other.extra[k] + updated = True + + for u in other.urls or []: + if u not in primary.urls: + primary.urls.append(u) + updated = True + + for i in other.release_ids or []: + if i not in primary.release_ids: + primary.release_ids.append(i) + updated = True + + return updated + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + + # currently requires for extid validation + if not evidence or not (evidence.get("extid_type") and evidence.get("extid")): + self.counts["skip-missing-evidence"] += 1 + return 0 + + updated_entities = 0 + entities: Dict[str, FileEntity] = dict() + eg_id = self.get_editgroup_id() + + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + try: + entities[ident] = self.api.get_file(ident) + except fatcat_openapi_client.ApiException as ae: + if ae.status == 404: + self.counts["skip-entity-not-found"] += 1 + return 0 + else: + raise + if entities[ident].state != "active": + self.counts["skip-not-active-entity"] += 1 + return 0 + if getattr(entities[ident].ext_ids, evidence["extid_type"]) != evidence["extid"]: + self.counts["skip-extid-mismatch"] += 1 + return 0 + + if not primary_id: + primary_id = self.choose_primary_file(list(entities.values())) + dupe_ids = [d for d in dupe_ids if d != primary_id] + + # ensure primary is not in dupes + assert primary_id not in dupe_ids + + primary = entities[primary_id] + primary_updated = False + for other_id in dupe_ids: + other = entities[other_id] + primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated + self.api.update_file( + eg_id, + other.ident, + FileEntity( + redirect=primary.ident, + edit_extra=evidence, + ), + ) + updated_entities += 1 + + if primary_updated: + self.api.update_file(eg_id, primary.ident, primary) + updated_entities += 1 + + return updated_entities + + +def run_merge_files(args: argparse.Namespace) -> None: + em = FileMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.files [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_files = subparsers.add_parser("merge-files") + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/python/tests/merge_files.py b/python/tests/merge_files.py new file mode 100644 index 00000000..c33c6f6c --- /dev/null +++ b/python/tests/merge_files.py @@ -0,0 +1,160 @@ +from fatcat_openapi_client import FileEntity, FileUrl +from fixtures import api + +from fatcat_tools.mergers.files import FileMerger + + +def test_choose_primary_file(api) -> None: + + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + + assert fm.choose_primary_file([fe_partial, fe_norelease]) == "bbbbsb5apzfhbbxxc7rgu2yw6m" + assert ( + fm.choose_primary_file([fe_partial, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_complete, fe_nourls, fe_norelease]) + == "ddddsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_pseudo_complete, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + + +def test_merge_file_metadata_from(api) -> None: + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="ddddddd315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_another_release_id = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + ) + fe_another_url = FileEntity( + ident="zzzzzzzapzfhbbxxc7rgu2yw6m", + urls=[ + FileUrl(rel="repository", url="http://someuni.edu/repo/file.pdf"), + ], + ) + fe_more_extra = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + extra=dict(thang=456), + ) + + assert fm.merge_file_metadata_from(fe_nourls, fe_partial) is False + assert fm.merge_file_metadata_from(fe_complete, fe_pseudo_complete) is False + assert fm.merge_file_metadata_from(fe_complete, fe_complete) is False + assert fm.merge_file_metadata_from(fe_partial, fe_norelease) is True + assert fe_partial.md5 == fe_norelease.md5 + assert fe_partial.size == fe_norelease.size + assert fm.merge_file_metadata_from(fe_partial, fe_complete) is True + assert fe_partial.md5 != fe_complete.md5 + assert fe_partial.extra == fe_complete.extra + assert set([(u.rel, u.url) for u in fe_partial.urls or []]) == set( + [(u.rel, u.url) for u in fe_complete.urls or []] + ) + assert fe_partial.release_ids == fe_complete.release_ids + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is True + assert fe_partial.release_ids == [ + "dlrxjg7mxrayxfltget7fqcrjy", + "qqqqqg7mxrayxfltget7fqcrjy", + ] + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is False + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is True + assert fe_partial.extra == dict(asdf=123, thang=456) + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is False + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is True + assert fe_partial.urls[-1].url == "http://someuni.edu/repo/file.pdf" + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is False -- cgit v1.2.3 From d56183b407c030fe4e9ba25da7916dad6f51d71c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 18:11:58 -0800 Subject: mergers: remove entity mergers from __init__ (to work around warning) --- python/fatcat_tools/mergers/__init__.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py index ccc4c02c..0d7cd468 100644 --- a/python/fatcat_tools/mergers/__init__.py +++ b/python/fatcat_tools/mergers/__init__.py @@ -1,3 +1 @@ from .common import EntityMerger -from .files import FileMerger -from .releases import ReleaseMerger -- cgit v1.2.3 From f6c4bd65c104e3a728e94561be80242cf35cbea3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 18:12:34 -0800 Subject: mergers: small tweaks --- python/fatcat_tools/mergers/common.py | 4 ++-- python/fatcat_tools/mergers/files.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 1aaf357d..05433748 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -76,8 +76,8 @@ class EntityMerger(EntityImporter): evidence: Optional[dict] # can be anything, entity- or merger-specific # some variables might be... - dupe_extid: str - dupe_extid_type: str + extid: str + extid_type: str Returns nothing. """ diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py index 4bc8bb81..32c7fcb6 100644 --- a/python/fatcat_tools/mergers/files.py +++ b/python/fatcat_tools/mergers/files.py @@ -106,7 +106,7 @@ class FileMerger(EntityMerger): evidence: Optional[Dict[str, Any]] = None, ) -> int: - # currently requires for extid validation + # currently required for extid validation if not evidence or not (evidence.get("extid_type") and evidence.get("extid")): self.counts["skip-missing-evidence"] += 1 return 0 -- cgit v1.2.3 From 112c41a1157862d2c8f758eac685b0b26c921797 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 19:29:49 -0800 Subject: file merges: fixes from testing in QA --- python/fatcat_tools/mergers/files.py | 37 ++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py index 32c7fcb6..30b31330 100644 --- a/python/fatcat_tools/mergers/files.py +++ b/python/fatcat_tools/mergers/files.py @@ -24,9 +24,12 @@ class FileMerger(EntityMerger): def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get("editgroup_description", "Automated merge of file entities") - eg_extra = kwargs.get("editgroup_extra", dict()) + eg_desc = ( + kwargs.pop("editgroup_description", None) or "Automated merge of file entities" + ) + eg_extra = kwargs.pop("editgroup_extra", dict()) eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger") + self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False) super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) self.entity_type_name = "file" @@ -130,7 +133,7 @@ class FileMerger(EntityMerger): if entities[ident].state != "active": self.counts["skip-not-active-entity"] += 1 return 0 - if getattr(entities[ident].ext_ids, evidence["extid_type"]) != evidence["extid"]: + if getattr(entities[ident], evidence["extid_type"]) != evidence["extid"]: self.counts["skip-extid-mismatch"] += 1 return 0 @@ -138,7 +141,6 @@ class FileMerger(EntityMerger): primary_id = self.choose_primary_file(list(entities.values())) dupe_ids = [d for d in dupe_ids if d != primary_id] - # ensure primary is not in dupes assert primary_id not in dupe_ids primary = entities[primary_id] @@ -146,25 +148,32 @@ class FileMerger(EntityMerger): for other_id in dupe_ids: other = entities[other_id] primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated - self.api.update_file( - eg_id, - other.ident, - FileEntity( - redirect=primary.ident, - edit_extra=evidence, - ), - ) + if not self.dry_run_mode: + self.api.update_file( + eg_id, + other.ident, + FileEntity( + redirect=primary.ident, + edit_extra=evidence, + ), + ) updated_entities += 1 if primary_updated: - self.api.update_file(eg_id, primary.ident, primary) + if not self.dry_run_mode: + self.api.update_file(eg_id, primary.ident, primary) updated_entities += 1 return updated_entities def run_merge_files(args: argparse.Namespace) -> None: - em = FileMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + em = FileMerger( + args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run, + editgroup_description=args.editgroup_description_override, + ) JsonLinePusher(em, args.json_file).run() -- cgit v1.2.3 From 8debf771c540b0bef7f4745195f8af87490917b0 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 19:30:14 -0800 Subject: release merger: some progress, but also disable (not complete) --- python/fatcat_tools/mergers/releases.py | 84 ++++++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 12 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py index fc970057..1f995b00 100644 --- a/python/fatcat_tools/mergers/releases.py +++ b/python/fatcat_tools/mergers/releases.py @@ -15,32 +15,83 @@ from .common import EntityMerger class ReleaseMerger(EntityMerger): """ Hard merges a set of release entities, redirecting all entities to a single - primary release. + primary release. This is different from "grouping" multiple releases under + a single work. - Will also redirect works (if appropriate), and re-point {files, filesets, - webcaptures} to the new merged release. + A "primary" (which the other releases will redirect to) can be provided, or + one will be chosen from the set of duplicate releases based on the + completeness of metadata and other heuristic factors. + + Releases are some of the most complex entities to merge, because of + the complexity of bibliographic metadata and the number of related entities + which also need to be updated. + + File, Fileset, and Webcapture entities which currently point to a release + which gets redirected will be updated to point at the "primary" release. + + Any Work entities which will end up with no releases pointing at them after + the merging will get redirected to the work corresponding to the "primary" + release. + + NOTE: the "primary" release will currently (as implemented) *not* get + updated with metadata from all the redirected releases """ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get("editgroup_description", "Automated merge of release entities") + eg_desc = ( + kwargs.pop("editgroup_description", None) or "Automated merge of release entities" + ) eg_extra = kwargs.get("editgroup_extra", dict()) eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger") + self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False) super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) self.entity_type_name = "release" + def choose_primary_release( + self, entities: List[ReleaseEntity], existing_redirects: Dict[str, List[str]] + ) -> str: + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # number of entities already redirected to this one + len(existing_redirects[a.ident]), + # number of file/fileset/webcapture entities (would need to update) + int(len(a.files or []) + len(a.filesets or []) + len(a.webcaptures or [])), + # has a strong identifier? + bool(a.ext_id.doi or a.ext_id.pmid or a.ext_id.arxiv_id), + # has any identifier? + bool(a.ext_id), + # has basic metadata? + bool(a.release_type), + bool(a.release_status), + bool(a.release_year), + bool(a.container_id), + # has refs, abstracts, extra stuff? + bool(a.refs), + bool(a.abstracts), + ), + reverse=True, + ) + return entities[0].ident + def try_merge( self, dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None, ) -> int: - """ - XXX: review/refactor; this code is very old - """ + + # TODO: this code is pretty old and has only been partially refactored. + # Needs more testing and review. + raise NotImplementedError updated_entities = 0 releases = dict() + existing_redirects: Dict[str, List[str]] = dict() eg_id = self.get_editgroup_id() all_ids = dupe_ids.copy() @@ -48,12 +99,16 @@ class ReleaseMerger(EntityMerger): all_ids.append(primary_id) for ident in all_ids: releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures") + existing_redirects[ident] = self.api.get_release_redirects(ident) if not primary_id: - # XXX: - primary_id = dupe_ids[0] + primary_id = self.choose_primary_release( + list(releases.values()), existing_redirects + ) dupe_ids = [d for d in dupe_ids if d != primary_id] + assert primary_id not in dupe_ids + primary_work_id = releases[primary_id].work_id updated_work_ids = [] redirected_release_ids = [] @@ -65,6 +120,7 @@ class ReleaseMerger(EntityMerger): # file redirects for e in release.files: + assert release.ident in e.release_ids e.release_ids.remove(release.ident) if primary_id not in e.release_ids: e.release_ids.append(primary_id) @@ -75,6 +131,7 @@ class ReleaseMerger(EntityMerger): # fileset redirects for e in release.filesets: + assert release.ident in e.release_ids e.release_ids.remove(release.ident) if primary_id not in e.release_ids: e.release_ids.append(primary_id) @@ -85,6 +142,7 @@ class ReleaseMerger(EntityMerger): # webcapture redirects for e in release.webcaptures: + assert release.ident in e.release_ids e.release_ids.remove(release.ident) if primary_id not in e.release_ids: e.release_ids.append(primary_id) @@ -93,12 +151,14 @@ class ReleaseMerger(EntityMerger): updated_entities += 1 self.counts["updated-webcaptures"] += 1 - # release redirect itself + # the release redirect itself updated_work_ids.append(release.work_id) redirected_release_ids.append(release.ident) if not self.dry_run_mode: self.api.update_release( - eg_id, release.ident, ReleaseEntity(redirect=primary_id) + eg_id, + release.ident, + ReleaseEntity(redirect=primary_id, edit_extra=evidence), ) updated_entities += 1 self.counts["updated-releases"] += 1 @@ -108,7 +168,7 @@ class ReleaseMerger(EntityMerger): updated_work_ids = list(set(updated_work_ids)) assert primary_work_id not in updated_work_ids for work_id in updated_work_ids: - work_releases = self.api.get_work_releases(work_id) + work_releases = self.api.get_work_releases(work_id, hide="abstracts,refs") rids = set([r.ident for r in work_releases]) if rids.issubset(redirected_release_ids): # all the releases for this work were updated/merged; we should -- cgit v1.2.3 From 9588a6a1aa01900b0dc376981635db8b609919f5 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 24 Nov 2021 14:29:08 -0800 Subject: file merger: add content_scope to list of merged fields --- python/fatcat_tools/mergers/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py index 30b31330..3efe85fd 100644 --- a/python/fatcat_tools/mergers/files.py +++ b/python/fatcat_tools/mergers/files.py @@ -70,7 +70,7 @@ class FileMerger(EntityMerger): """ updated = False # NOTE: intentionally not including sha1 here - for k in ["size", "mimetype", "sha256", "md5"]: + for k in ["size", "mimetype", "sha256", "md5", "content_scope"]: if not getattr(primary, k) and getattr(other, k): setattr(primary, k, getattr(other, k)) updated = True -- cgit v1.2.3 From 62bf6202d002e96ffd81d7e4634502be1886d5c3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 24 Nov 2021 16:26:06 -0800 Subject: mergers common: remove inaccurate comment Caught in review, thanks miku --- python/fatcat_tools/mergers/common.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 05433748..e25f8194 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -34,8 +34,6 @@ class EntityMerger(EntityImporter): # implemented per-task try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None - - This class is pretty similar to EntityImporter, but isn't subclassed. """ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: -- cgit v1.2.3