From 8b132461c9e9f85e61c520f2f576144a6f6e06ac Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 5 Sep 2019 18:41:30 -0700 Subject: first iteration of mergers --- python/fatcat_merge.py | 112 +++++++++++++++++++++++++++ python/fatcat_tools/mergers/__init__.py | 3 + python/fatcat_tools/mergers/common.py | 130 ++++++++++++++++++++++++++++++++ python/fatcat_tools/mergers/releases.py | 110 +++++++++++++++++++++++++++ 4 files changed, 355 insertions(+) create mode 100755 python/fatcat_merge.py create mode 100644 python/fatcat_tools/mergers/__init__.py create mode 100644 python/fatcat_tools/mergers/common.py create mode 100644 python/fatcat_tools/mergers/releases.py diff --git a/python/fatcat_merge.py b/python/fatcat_merge.py new file mode 100755 index 00000000..7b0ae63b --- /dev/null +++ b/python/fatcat_merge.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 + +""" +Tools for merging entities in various ways. + + group-releases: pull all release entities under a single work + => merges work entities + merge-releases: merge release entities together + => groups files/filesets/webcaptures + merge-containers: merge container entities + merge-files: merge file entities + +Input format is usually JSON lines with keys: + + idents (required): array of string identifiers + primary (optional): single string identifier + +""" + +import os, sys, argparse +from fatcat_tools import authenticated_api +from fatcat_tools.mergers import * +from fatcat_tools.importers import JsonLinePusher + + +def run_group_releases(args): + rg = ReleaseGrouper(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(rg, args.json_file).run() + +def run_merge_releases(args): + rm = ReleaseMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(rg, args.json_file).run() + +def run_merge_containers(args): + cm = ReleaseMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(cm, args.json_file).run() + +def run_merge_files(args): + fm = FileMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(fm, args.json_file).run() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--batch-size', + help="size of batch to send", + default=50, type=int) + parser.add_argument('--editgroup-description-override', + help="editgroup description override", + default=None, type=str) + parser.add_argument('--dry-run', + action='store_true', + help="don't actually commit merges, just count what would have been") + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_group_releases = subparsers.add_parser('group-releases') + sub_group_releases.set_defaults(func=run_group_releases) + sub_group_releases.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_releases = subparsers.add_parser('merge-releases') + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_files = subparsers.add_parser('merge-files') + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_containers = subparsers.add_parser('merge-containers') + sub_merge_containers.set_defaults(func=run_merge_containers) + sub_merge_containers.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override \ + and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): + args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var)) + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py new file mode 100644 index 00000000..c38a397d --- /dev/null +++ b/python/fatcat_tools/mergers/__init__.py @@ -0,0 +1,3 @@ + +from .common import EntityMerger +from .releases import ReleaseMerger, ReleaseGrouper diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..62a29c42 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,130 @@ + +""" +Tools for merging entities in various ways. + + group-releases: pull all release entities under a single work + => merges work entities + merge-releases: merge release entities together + => groups files/filesets/webcaptures + merge-containers: merge container entities + merge-files: merge file entities + +Input format is JSON lines with keys: + + idents (required): array of string identifiers + primary (optional): single string identifier + +""" + +import subprocess +from collections import Counter + +import fatcat_api_client +from fatcat_api_client.rest import ApiException + + +class EntityMerger: + """ + API for individual jobs: + + # record iterators sees + push_record(raw_record) + finish() + + # provided helpers + self.api + self.get_editgroup_id() + counts({'lines', 'skip', 'merged', 'updated'}) + + # implemented per-task + try_merge(idents, primary=None) -> int (entities updated) + + This class is pretty similar to EntityImporter, but isn't subclassed. + """ + + def __init__(self, api, **kwargs): + + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['git_rev'] = eg_extra.get('git_rev', + subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + + self.api = api + self.dry_run_mode = kwargs.get('dry_run_mode', True) + self.edit_batch_size = kwargs.get('edit_batch_size', 50) + self.editgroup_description = kwargs.get('editgroup_description') + self.editgroup_extra = eg_extra + self.reset() + + if self.dry_run_mode: + print("Running in dry-run mode!") + + def reset(self): + self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + self._idents_inflight = [] + + def push_record(self, line): + """ + Intended to be called by "pusher" class (which could be pulling from + JSON file, Kafka, whatever). + + Input is expected to be a dict-like object with key "idents", and + optionally "primary". + + Returns nothing. + """ + self.counts['lines'] += 1 + if (not raw_record): + self.counts['skip'] += 1 + return + primary = line.get('primary') + idents = list(set(line.get('idents'))) + if primary and primary not in idents: + idents.append(primary) + if not idents or len(idents) <= 1: + self.counts['skip'] += 1 + return + for i in idents: + if i in self._idents_inflight: + raise ValueError("Entity already part of in-process merge operation: {}".format(i)) + self._idents.inflight.append(i) + count = self.try_merge(idents, primary=primary) + if count: + self.counts['merged'] += 1 + self.counts['updated-total'] += count + self._edit_count += count + else: + self.counts['skip'] += 1 + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + return + + def try_merge(self, idents, primary=None): + # implementations should fill this in + raise NotImplementedError + + def finish(self): + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + + return self.counts + + def get_editgroup_id(self): + + if not self._editgroup_id: + eg = self.api.create_editgroup( + fatcat_api_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + self._editgroup_id = eg.editgroup_id + + return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py new file mode 100644 index 00000000..802cb8da --- /dev/null +++ b/python/fatcat_tools/mergers/releases.py @@ -0,0 +1,110 @@ + +from .common import EntityMerger +from fatcat_api_client.models import ReleaseEntity, WorkEntity + + +class ReleaseMerger(EntityMerger): + """ + Hard merges a set of release entities, redirecting all entities to a single + primary release. + + Will also redirect works (if appropriate), and re-point {files, filesets, + webcaptures} to the new merged release. + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.get('editgroup_description', + "Automated merge of release entities") + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def try_merge(self, idents, primary=None): + + updated_entities = 0 + releases = dict() + eg_id = self.get_editgroup_id() + + self.dry_run_mode + + for ident in idents: + releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + + # select the primary (if not already set) + if not primary: + primary = releases.keys()[0] + + primary_work_id = releases[primary].work_id + updated_work_ids = [] + redirected_release_ids = [] + + # execute all the release redirects + for release in releases.values(): + if release.ident == primary: + continue + + # file redirects + for e in release.files: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_file(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-files'] += 1 + + # fileset redirects + for e in release.filesets: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_fileset(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-filesets'] += 1 + + # webcapture redirects + for e in release.webcaptures: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_webcapture(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-webcaptures'] += 1 + + # release redirect itself + updated_work_ids.append(release.work_id) + redirected_release_ids.append(release.ident) + if not self.dry_run_mode: + api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) + updated_entities += 1 + self.counts['updated-releases'] + + + # lastly, clean up any merged work entities + redirected_release_ids = set(redirected_release_ids) + updated_work_ids = list(set(updated_work_ids)) + assert primary_work_id not in updated_work_ids + for work_id in updated_work_ids: + work_releases = api.get_work_releases(work_id) + rids = set([r.ident for r in work_releases]) + if rids.issubset(redirected_release_ids): + # all the releases for this work were updated/merged; we should + # redirect to primary work_id + # also check we don't have any in-flight edit conflicts + assert not work_id in self._idents_inflight + self._idents_inflight.append(work_id) + if not self.dry_run_mode: + api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + updated_entities += 1 + self.counts['updated-works'] += 1 + + return updated_entities + +class ReleaseGrouper(EntityMerger): + pass -- cgit v1.2.3 From a00f4c63e7db5b021ded7c6caf6f1b889627568a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 16 Nov 2021 19:27:33 -0800 Subject: remove top-level fatcat_merge.py; going to call module __main__ going forward --- python/fatcat_merge.py | 112 ------------------------------------------------- 1 file changed, 112 deletions(-) delete mode 100755 python/fatcat_merge.py diff --git a/python/fatcat_merge.py b/python/fatcat_merge.py deleted file mode 100755 index 7b0ae63b..00000000 --- a/python/fatcat_merge.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 - -""" -Tools for merging entities in various ways. - - group-releases: pull all release entities under a single work - => merges work entities - merge-releases: merge release entities together - => groups files/filesets/webcaptures - merge-containers: merge container entities - merge-files: merge file entities - -Input format is usually JSON lines with keys: - - idents (required): array of string identifiers - primary (optional): single string identifier - -""" - -import os, sys, argparse -from fatcat_tools import authenticated_api -from fatcat_tools.mergers import * -from fatcat_tools.importers import JsonLinePusher - - -def run_group_releases(args): - rg = ReleaseGrouper(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(rg, args.json_file).run() - -def run_merge_releases(args): - rm = ReleaseMerger(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(rg, args.json_file).run() - -def run_merge_containers(args): - cm = ReleaseMerger(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(cm, args.json_file).run() - -def run_merge_files(args): - fm = FileMerger(args.api, - edit_batch_size=args.batch_size, - dry_run_mode=args.dry_run) - JsonLinePusher(fm, args.json_file).run() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--host-url', - default="http://localhost:9411/v0", - help="connect to this host/port") - parser.add_argument('--batch-size', - help="size of batch to send", - default=50, type=int) - parser.add_argument('--editgroup-description-override', - help="editgroup description override", - default=None, type=str) - parser.add_argument('--dry-run', - action='store_true', - help="don't actually commit merges, just count what would have been") - parser.set_defaults( - auth_var="FATCAT_AUTH_API_TOKEN", - ) - subparsers = parser.add_subparsers() - - sub_group_releases = subparsers.add_parser('group-releases') - sub_group_releases.set_defaults(func=run_group_releases) - sub_group_releases.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_merge_releases = subparsers.add_parser('merge-releases') - sub_merge_releases.set_defaults(func=run_merge_releases) - sub_merge_releases.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_merge_files = subparsers.add_parser('merge-files') - sub_merge_files.set_defaults(func=run_merge_files) - sub_merge_files.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_merge_containers = subparsers.add_parser('merge-containers') - sub_merge_containers.set_defaults(func=run_merge_containers) - sub_merge_containers.add_argument('json_file', - help="source of merge lines to process (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - args = parser.parse_args() - if not args.__dict__.get("func"): - print("tell me what to do!") - sys.exit(-1) - - # allow editgroup description override via env variable (but CLI arg takes - # precedence) - if not args.editgroup_description_override \ - and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): - args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') - - args.api = authenticated_api( - args.host_url, - # token is an optional kwarg (can be empty string, None, etc) - token=os.environ.get(args.auth_var)) - args.func(args) - -if __name__ == '__main__': - main() -- cgit v1.2.3 From 717e4d71620093e16bac3ae8932c482ac8b12efa Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Nov 2021 12:28:19 -0800 Subject: mergers: fmt, lint, refactors These old merger code is from an old branch and needed significant cleanup --- python/fatcat_tools/mergers/__init__.py | 4 +- python/fatcat_tools/mergers/common.py | 129 +++++++++++++++---------- python/fatcat_tools/mergers/releases.py | 163 ++++++++++++++++++++++++-------- 3 files changed, 200 insertions(+), 96 deletions(-) diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py index c38a397d..ccc4c02c 100644 --- a/python/fatcat_tools/mergers/__init__.py +++ b/python/fatcat_tools/mergers/__init__.py @@ -1,3 +1,3 @@ - from .common import EntityMerger -from .releases import ReleaseMerger, ReleaseGrouper +from .files import FileMerger +from .releases import ReleaseMerger diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 62a29c42..1aaf357d 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -1,29 +1,25 @@ - """ Tools for merging entities in various ways. - group-releases: pull all release entities under a single work - => merges work entities merge-releases: merge release entities together => groups files/filesets/webcaptures + => merges work entities if needed + merge-works: pull all release entities under a single work + => merges work entities merge-containers: merge container entities merge-files: merge file entities - -Input format is JSON lines with keys: - - idents (required): array of string identifiers - primary (optional): single string identifier - """ import subprocess from collections import Counter +from typing import Any, Dict, List, Optional -import fatcat_api_client -from fatcat_api_client.rest import ApiException +import fatcat_openapi_client +from fatcat_tools.importers import EntityImporter -class EntityMerger: + +class EntityMerger(EntityImporter): """ API for individual jobs: @@ -37,67 +33,85 @@ class EntityMerger: counts({'lines', 'skip', 'merged', 'updated'}) # implemented per-task - try_merge(idents, primary=None) -> int (entities updated) + try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None This class is pretty similar to EntityImporter, but isn't subclassed. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger") self.api = api - self.dry_run_mode = kwargs.get('dry_run_mode', True) - self.edit_batch_size = kwargs.get('edit_batch_size', 50) - self.editgroup_description = kwargs.get('editgroup_description') + self.dry_run_mode = kwargs.get("dry_run_mode", True) + self.edit_batch_size = kwargs.get("edit_batch_size", 50) + self.editgroup_description = kwargs.get("editgroup_description") self.editgroup_extra = eg_extra self.reset() + self.entity_type_name = "common" if self.dry_run_mode: print("Running in dry-run mode!") - def reset(self): - self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + def reset(self) -> None: + self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0}) self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] - self._idents_inflight = [] + self._editgroup_id: Optional[str] = None + self._idents_inflight: List[str] = [] - def push_record(self, line): + def push_record(self, record: Dict[str, Any]) -> None: """ Intended to be called by "pusher" class (which could be pulling from JSON file, Kafka, whatever). - Input is expected to be a dict-like object with key "idents", and - optionally "primary". + Input is expected to be a dict-like object with keys: + + entity_type: str + primary_id: Optional[str] + duplicate_ids: [str] (order not preserved) + evidence: Optional[dict] + # can be anything, entity- or merger-specific + # some variables might be... + dupe_extid: str + dupe_extid_type: str Returns nothing. """ - self.counts['lines'] += 1 - if (not raw_record): - self.counts['skip'] += 1 + self.counts["lines"] += 1 + if not record: + self.counts["skip-blank-line"] += 1 + return + if record.get("entity_type") != self.entity_type_name: + self.counts["skip-entity-type"] += 1 return - primary = line.get('primary') - idents = list(set(line.get('idents'))) - if primary and primary not in idents: - idents.append(primary) - if not idents or len(idents) <= 1: - self.counts['skip'] += 1 + primary_id: Optional[str] = record.get("primary_id") + duplicate_ids: List[str] = list(set(record["duplicate_ids"])) + duplicate_ids = [di for di in duplicate_ids if di != primary_id] + if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id): + self.counts["skip-no-dupes"] += 1 return - for i in idents: + all_ids = duplicate_ids + if primary_id: + all_ids.append(primary_id) + for i in all_ids: if i in self._idents_inflight: - raise ValueError("Entity already part of in-process merge operation: {}".format(i)) - self._idents.inflight.append(i) - count = self.try_merge(idents, primary=primary) + raise ValueError( + "Entity already part of in-process merge operation: {}".format(i) + ) + self._idents_inflight.append(i) + count = self.try_merge( + duplicate_ids, primary_id=primary_id, evidence=record.get("evidence") + ) if count: - self.counts['merged'] += 1 - self.counts['updated-total'] += count + self.counts["merged"] += 1 + self.counts["updated-entities"] += count self._edit_count += count else: - self.counts['skip'] += 1 + self.counts["skip"] += 1 if self._edit_count >= self.edit_batch_size: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -105,11 +119,16 @@ class EntityMerger: self._idents_inflight = [] return - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: # implementations should fill this in raise NotImplementedError - def finish(self): + def finish(self) -> Counter: if self._edit_count > 0: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -118,13 +137,19 @@ class EntityMerger: return self.counts - def get_editgroup_id(self): + def get_editgroup_id(self, _edits: int = 1) -> str: + """ + This version of get_editgroup_id() is similar to the EntityImporter + version, but does not update self._edit_count or submit editgroups. The + edits parameter is ignored. + """ if not self._editgroup_id: eg = self.api.create_editgroup( - fatcat_api_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra)) + fatcat_openapi_client.Editgroup( + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) self._editgroup_id = eg.editgroup_id - + assert self._editgroup_id return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py index 802cb8da..fc970057 100644 --- a/python/fatcat_tools/mergers/releases.py +++ b/python/fatcat_tools/mergers/releases.py @@ -1,6 +1,15 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import ReleaseEntity, WorkEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher from .common import EntityMerger -from fatcat_api_client.models import ReleaseEntity, WorkEntity class ReleaseMerger(EntityMerger): @@ -12,99 +21,169 @@ class ReleaseMerger(EntityMerger): webcaptures} to the new merged release. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get('editgroup_description', - "Automated merge of release entities") - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') - super().__init__(api, - editgroup_description=eg_desc, - editgroup_extra=eg_extra, - **kwargs) + eg_desc = kwargs.get("editgroup_description", "Automated merge of release entities") + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger") + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "release" - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + """ + XXX: review/refactor; this code is very old + """ updated_entities = 0 releases = dict() eg_id = self.get_editgroup_id() - self.dry_run_mode - - for ident in idents: - releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures") - # select the primary (if not already set) - if not primary: - primary = releases.keys()[0] + if not primary_id: + # XXX: + primary_id = dupe_ids[0] + dupe_ids = [d for d in dupe_ids if d != primary_id] - primary_work_id = releases[primary].work_id + primary_work_id = releases[primary_id].work_id updated_work_ids = [] redirected_release_ids = [] # execute all the release redirects for release in releases.values(): - if release.ident == primary: + if release.ident == primary_id: continue # file redirects for e in release.files: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_file(eg_id, e.ident, e) + self.api.update_file(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-files'] += 1 + self.counts["updated-files"] += 1 # fileset redirects for e in release.filesets: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_fileset(eg_id, e.ident, e) + self.api.update_fileset(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-filesets'] += 1 + self.counts["updated-filesets"] += 1 # webcapture redirects for e in release.webcaptures: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_webcapture(eg_id, e.ident, e) + self.api.update_webcapture(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-webcaptures'] += 1 + self.counts["updated-webcaptures"] += 1 # release redirect itself updated_work_ids.append(release.work_id) redirected_release_ids.append(release.ident) if not self.dry_run_mode: - api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) + self.api.update_release( + eg_id, release.ident, ReleaseEntity(redirect=primary_id) + ) updated_entities += 1 - self.counts['updated-releases'] - + self.counts["updated-releases"] += 1 # lastly, clean up any merged work entities - redirected_release_ids = set(redirected_release_ids) + redirected_release_ids = list(set(redirected_release_ids)) updated_work_ids = list(set(updated_work_ids)) assert primary_work_id not in updated_work_ids for work_id in updated_work_ids: - work_releases = api.get_work_releases(work_id) + work_releases = self.api.get_work_releases(work_id) rids = set([r.ident for r in work_releases]) if rids.issubset(redirected_release_ids): # all the releases for this work were updated/merged; we should # redirect to primary work_id # also check we don't have any in-flight edit conflicts - assert not work_id in self._idents_inflight + assert work_id not in self._idents_inflight self._idents_inflight.append(work_id) if not self.dry_run_mode: - api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) updated_entities += 1 - self.counts['updated-works'] += 1 + self.counts["updated-works"] += 1 return updated_entities -class ReleaseGrouper(EntityMerger): - pass + +def run_merge_releases(args: argparse.Namespace) -> None: + em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.releases [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_releases = subparsers.add_parser("merge-releases") + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() -- cgit v1.2.3 From f12bde00c34abf1d4a1604a76cac033b3c4c864b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Nov 2021 12:36:00 -0800 Subject: initial file merger, with tests --- python/fatcat_tools/mergers/files.py | 228 +++++++++++++++++++++++++++++++++++ python/tests/merge_files.py | 160 ++++++++++++++++++++++++ 2 files changed, 388 insertions(+) create mode 100644 python/fatcat_tools/mergers/files.py create mode 100644 python/tests/merge_files.py diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py new file mode 100644 index 00000000..4bc8bb81 --- /dev/null +++ b/python/fatcat_tools/mergers/files.py @@ -0,0 +1,228 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import FileEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher + +from .common import EntityMerger + + +class FileMerger(EntityMerger): + """ + Combines file entities into a single primary. Merges any existing partial + metadata (such as release_ids and URLs). Can choose a primary if necessary. + + The primary is only updated if needed. + + TODO: relies on API server to detect "redirect of redirect" situation + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_desc = kwargs.get("editgroup_description", "Automated merge of file entities") + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger") + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "file" + + def choose_primary_file(self, entities: List[FileEntity]) -> str: + """ + TODO: could incorporate number of redirected entities already pointing at an entity + """ + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # has complete metadata? + bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)), + # has releases associated? + bool(a.release_ids), + # has URLs? + bool(a.urls), + # has extra metadata? + bool(a.extra), + # number of release_ids + len(a.release_ids or []), + ), + reverse=True, + ) + return entities[0].ident + + def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool: + """ + Compares a primary to an other. If there are helpful metadata fields in + the other, copy them to primary, in-place. + + This is intended to extract any useful metadata from "other" before it + gets redirected to "primary". + + Returns True if the primary was updated, False otherwise. + """ + updated = False + # NOTE: intentionally not including sha1 here + for k in ["size", "mimetype", "sha256", "md5"]: + if not getattr(primary, k) and getattr(other, k): + setattr(primary, k, getattr(other, k)) + updated = True + + if not primary.urls: + primary.urls = [] + if not primary.release_ids: + primary.release_ids = [] + + if other.extra: + if not primary.extra: + primary.extra = other.extra + updated = True + else: + for k in other.extra.keys(): + if k not in primary.extra: + primary.extra[k] = other.extra[k] + updated = True + + for u in other.urls or []: + if u not in primary.urls: + primary.urls.append(u) + updated = True + + for i in other.release_ids or []: + if i not in primary.release_ids: + primary.release_ids.append(i) + updated = True + + return updated + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + + # currently requires for extid validation + if not evidence or not (evidence.get("extid_type") and evidence.get("extid")): + self.counts["skip-missing-evidence"] += 1 + return 0 + + updated_entities = 0 + entities: Dict[str, FileEntity] = dict() + eg_id = self.get_editgroup_id() + + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + try: + entities[ident] = self.api.get_file(ident) + except fatcat_openapi_client.ApiException as ae: + if ae.status == 404: + self.counts["skip-entity-not-found"] += 1 + return 0 + else: + raise + if entities[ident].state != "active": + self.counts["skip-not-active-entity"] += 1 + return 0 + if getattr(entities[ident].ext_ids, evidence["extid_type"]) != evidence["extid"]: + self.counts["skip-extid-mismatch"] += 1 + return 0 + + if not primary_id: + primary_id = self.choose_primary_file(list(entities.values())) + dupe_ids = [d for d in dupe_ids if d != primary_id] + + # ensure primary is not in dupes + assert primary_id not in dupe_ids + + primary = entities[primary_id] + primary_updated = False + for other_id in dupe_ids: + other = entities[other_id] + primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated + self.api.update_file( + eg_id, + other.ident, + FileEntity( + redirect=primary.ident, + edit_extra=evidence, + ), + ) + updated_entities += 1 + + if primary_updated: + self.api.update_file(eg_id, primary.ident, primary) + updated_entities += 1 + + return updated_entities + + +def run_merge_files(args: argparse.Namespace) -> None: + em = FileMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.files [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_files = subparsers.add_parser("merge-files") + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/python/tests/merge_files.py b/python/tests/merge_files.py new file mode 100644 index 00000000..c33c6f6c --- /dev/null +++ b/python/tests/merge_files.py @@ -0,0 +1,160 @@ +from fatcat_openapi_client import FileEntity, FileUrl +from fixtures import api + +from fatcat_tools.mergers.files import FileMerger + + +def test_choose_primary_file(api) -> None: + + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + + assert fm.choose_primary_file([fe_partial, fe_norelease]) == "bbbbsb5apzfhbbxxc7rgu2yw6m" + assert ( + fm.choose_primary_file([fe_partial, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_complete, fe_nourls, fe_norelease]) + == "ddddsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_pseudo_complete, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + + +def test_merge_file_metadata_from(api) -> None: + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="ddddddd315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_another_release_id = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + ) + fe_another_url = FileEntity( + ident="zzzzzzzapzfhbbxxc7rgu2yw6m", + urls=[ + FileUrl(rel="repository", url="http://someuni.edu/repo/file.pdf"), + ], + ) + fe_more_extra = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + extra=dict(thang=456), + ) + + assert fm.merge_file_metadata_from(fe_nourls, fe_partial) is False + assert fm.merge_file_metadata_from(fe_complete, fe_pseudo_complete) is False + assert fm.merge_file_metadata_from(fe_complete, fe_complete) is False + assert fm.merge_file_metadata_from(fe_partial, fe_norelease) is True + assert fe_partial.md5 == fe_norelease.md5 + assert fe_partial.size == fe_norelease.size + assert fm.merge_file_metadata_from(fe_partial, fe_complete) is True + assert fe_partial.md5 != fe_complete.md5 + assert fe_partial.extra == fe_complete.extra + assert set([(u.rel, u.url) for u in fe_partial.urls or []]) == set( + [(u.rel, u.url) for u in fe_complete.urls or []] + ) + assert fe_partial.release_ids == fe_complete.release_ids + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is True + assert fe_partial.release_ids == [ + "dlrxjg7mxrayxfltget7fqcrjy", + "qqqqqg7mxrayxfltget7fqcrjy", + ] + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is False + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is True + assert fe_partial.extra == dict(asdf=123, thang=456) + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is False + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is True + assert fe_partial.urls[-1].url == "http://someuni.edu/repo/file.pdf" + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is False -- cgit v1.2.3 From e613e4ceb607f01287c44c8f7f526a776abed986 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Nov 2021 13:36:57 -0800 Subject: add proposal for entity mergers --- proposals/2021-11-17_entity_mergers.md | 110 +++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 proposals/2021-11-17_entity_mergers.md diff --git a/proposals/2021-11-17_entity_mergers.md b/proposals/2021-11-17_entity_mergers.md new file mode 100644 index 00000000..6c41ca58 --- /dev/null +++ b/proposals/2021-11-17_entity_mergers.md @@ -0,0 +1,110 @@ + +status: implemented + +Entity Mergers +=============== + +One category of type of catalog metadata cleanup is merging multiple duplicate +entries into a single record. The fatcat catalog allows this via during the +duplicate entities into "redirect reccords" which point at the single merged +record. + +This proposal briefly describes the process for doing bulk merges. + + +## External Identifier Duplicates + +The easiest category of entity duplicates to discover is cases where multiple +entities have the same external (persistent) identifier. For example, releases +with the exact same DOI, containers with the same ISSN-L, or creators with the +same ORCiD. Files with the same SHA-1 hash is a similar issue. The catalog does +not block the creation of such entities, though it is assumed that editors and +bots will do their best to prevent creating duplicates, and that this is +checked and monitored via review bots (auto-annotation) and bulk quality +checks. + +In these cases, it is simple enough to use the external identifier dumps (part +of the fatcat bulk exports), find duplicates by identifier, and create merge +requests. + + +## Merge Requests JSON Schema + +Proposed JSON schema for bulk entity merging: + + entity_type: str, required. eg: "file" + primary_id: str, optional, entity ident + duplicate_ids: [str], required, entity idents + evidence: dict, optional, merger/entity specific + # evidence fields for external identifier dupes + extid: str, the identifier value + extid_type: str, eg "doi" or "sha1" + +The merge request generation process might indicate which of the entities +should end up as the "primary", or it might leave that determination to the +merger itself. `primary_id` should not be set arbitrarily or randomly if there +is not a good reason for a specific entity to be the "primary" which others +redirect to. + +The `primary_id` should not be included in `duplicate_ids`, but the merger code +will remove it if included accidentally. + +The `evidence` fields are flexible. By default they will all be included as +top-level "edit extra" metadata on each individual entity redirected, but not +on the primary entity (if it gets updated). + + +## Merge Process and Semantics + +The assumption is that all the entities indicated in `duplicate_ids` will be +redirected to the `primary_id`. Any metadata included in the duplicates which +is not included in the primary will be copied in to the primary, but existing +primary metadata fields will not be "clobbered" (overwritten) by duplicate +metadata. This includes top-level fields of the `extra` metadata dict, if +appropriate. If there is no unique metadata in the redirected entities, the +primary does not need to be updated and will not be. + + +## Work/Release Grouping and Merging + +Work and Release entities are something of a special case. + +Merging two release entities will result in all artifact entities (files, +filesets, webcaptures) being updated which previously pointed at the duplicate +entity to point to the primary entity. If the work entities associated with the +duplicate releases have no other releases associated with them, they also will +be merged (redirected) to the primary release's work entity. + +"Grouping" releases is the same as merging their works. In this situation, the +number of distinct release entities stays the same, but the duplicates are +updated to be under the same work as the primary. This is initially implemented +by merging the work entites, and then updating *all* the releases under each +merged work towards the primary work identifier. No artifact entities need to +be updated in this scenario. + +A currently planned option would be to pull a single release out of a group of +releases under a work, and point it to a new work. This would be a form of +"regrouping". For now this can only be achieved by updating the release +entities individually, not in a bulk/automated manner. + + +## Container Merging + +Because many releases point to containers, it is not practical to update all +the releases at the same time as merging the containers. In the long run it is +good for the health of the catalog to have all the releases updated to point at +the the primary container, but these updates can be delayed. + +To keep statistics and functionality working before release updates happen, +downstream users of release entities should "expand" container sub-entities and +use the "redirect" ident of the container entity instead of "ident", if the +"redirect" is set. For example, when linking in web interfaces, or when doing a +schema transform in the fatcat and scholar.archive.org search index. + + +## Background Reading + +"The Lens MetaRecord and LensID: An open identifier system for aggregated +metadata and versioning of knowledge artefacts" +https://osf.io/preprints/lissa/t56yh/ + -- cgit v1.2.3 From d56183b407c030fe4e9ba25da7916dad6f51d71c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 18:11:58 -0800 Subject: mergers: remove entity mergers from __init__ (to work around warning) --- python/fatcat_tools/mergers/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py index ccc4c02c..0d7cd468 100644 --- a/python/fatcat_tools/mergers/__init__.py +++ b/python/fatcat_tools/mergers/__init__.py @@ -1,3 +1 @@ from .common import EntityMerger -from .files import FileMerger -from .releases import ReleaseMerger -- cgit v1.2.3 From f6c4bd65c104e3a728e94561be80242cf35cbea3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 18:12:34 -0800 Subject: mergers: small tweaks --- python/fatcat_tools/mergers/common.py | 4 ++-- python/fatcat_tools/mergers/files.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 1aaf357d..05433748 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -76,8 +76,8 @@ class EntityMerger(EntityImporter): evidence: Optional[dict] # can be anything, entity- or merger-specific # some variables might be... - dupe_extid: str - dupe_extid_type: str + extid: str + extid_type: str Returns nothing. """ diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py index 4bc8bb81..32c7fcb6 100644 --- a/python/fatcat_tools/mergers/files.py +++ b/python/fatcat_tools/mergers/files.py @@ -106,7 +106,7 @@ class FileMerger(EntityMerger): evidence: Optional[Dict[str, Any]] = None, ) -> int: - # currently requires for extid validation + # currently required for extid validation if not evidence or not (evidence.get("extid_type") and evidence.get("extid")): self.counts["skip-missing-evidence"] += 1 return 0 -- cgit v1.2.3 From 8080eef139b5dcf6201e4f27076a879d0df20096 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 18:58:37 -0800 Subject: file de-dupe: notes on prep and QA testing --- notes/cleanups/file_sha1_dedupe.md | 64 +++++++++++++++++++++++++ notes/cleanups/scripts/file_dupe_to_json.py | 72 +++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 notes/cleanups/file_sha1_dedupe.md create mode 100755 notes/cleanups/scripts/file_dupe_to_json.py diff --git a/notes/cleanups/file_sha1_dedupe.md b/notes/cleanups/file_sha1_dedupe.md new file mode 100644 index 00000000..0829bc79 --- /dev/null +++ b/notes/cleanups/file_sha1_dedupe.md @@ -0,0 +1,64 @@ + + +## Prep + +Using `check_hashes.sh`: + + zcat $HASH_FILE \ + | awk '{print $3 "\t" $1}' \ + | rg -v '^\t' \ + | sort -S 4G \ + | uniq -D -w 40 \ + > sha1_ident.dupes.tsv + + wc -l sha1_ident.dupes.tsv + # 6,350 + + cut -f1 sha1_ident.dupes.tsv | uniq | wc -l + # 2,039 + +Want to create JSON for each group, like: + + entity_type: "file" + primary_id: str or None + duplicate_ids: [str] + evidence: + extid: str + extid_type: "sha1" + +Run transform script: + + cat sha1_ident.dupes.tsv | ./file_dupe_to_json.py | pv -l > file_sha1_dupes.json + # 2.04k 0:00:00 [9.16k/s] + + +## QA Testing + + export FATCAT_AUTH_API_TOKEN=[...] + + head -n25 /srv/fatcat/datasets/file_sha1_dupes.json \ + | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" --dry-run merge-files - + +Hit some small bugs running in QA; test coverage isn't great, but I think hits +the important parts. + + head -n25 /srv/fatcat/datasets/file_sha1_dupes.json \ + | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" --dry-run merge-files - + # Running in dry-run mode! + # Counter({'updated-entities': 60, 'lines': 25, 'merged': 25, 'skip': 0, 'updated-total': 0}) + +Dry-run mode didn't actually work, and edits actually happened (!). + +Edits do look good. + +Try again, not dry-run, to ensure that case is handled: + + head -n25 /srv/fatcat/datasets/file_sha1_dupes.json | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" merge-files - + # Counter({'lines': 25, 'skip': 25, 'skip-not-active-entity': 25, 'merged': 0, 'updated-total': 0}) + +And then run 500 through for more testing: + + head -n500 /srv/fatcat/datasets/file_sha1_dupes.json | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" merge-files - + # Counter({'updated-entities': 1341, 'lines': 500, 'merged': 474, 'skip': 26, 'skip-not-active-entity': 25, 'skip-entity-not-found': 1, 'updated-total': 0}) + +The majority of merges seem to be cases where there are multiple articles in the same PDF. diff --git a/notes/cleanups/scripts/file_dupe_to_json.py b/notes/cleanups/scripts/file_dupe_to_json.py new file mode 100755 index 00000000..2064dc1c --- /dev/null +++ b/notes/cleanups/scripts/file_dupe_to_json.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +""" +This script can be used to transform duplicate file entity hash export rows +into JSON objects which can be passed to the file entity merger. + +The input is expected to be a TSV with two columns: a hash value in the first +column, and a fatcat file entity ident (in UUID format, not "fatcat ident" +encoded) in the second column. The rows are assumed to be sorted by hash value +(the first column), and duplicate values (same hash, differing UUID) are +contiguous. + +File hashes aren't really "external identifiers" (ext_id), but we treat them as +such here. + +Script is pretty simple, should be possible to copy and reuse for release, +container, creator entity duplicates. +""" + +import json, sys +from typing import Optional +import base64, uuid + +EXTID_TYPE = "sha1" + +def uuid2fcid(s: str) -> str: + """ + Converts a uuid.UUID object to a fatcat identifier (base32 encoded string) + """ + raw = uuid.UUID(s).bytes + return base64.b32encode(raw)[:26].lower().decode("utf-8") + +def print_group(extid, dupe_ids): + if len(dupe_ids) < 2: + return + group = dict( + entity_type="file", + primary_id=None, + duplicate_ids=dupe_ids, + evidence=dict( + extid=extid, + extid_type=EXTID_TYPE, + ), + ) + print(json.dumps(group, sort_keys=True)) + +def run(): + last_extid = None + dupe_ids = [] + for l in sys.stdin: + l = l.strip() + if not l: + continue + (row_extid, row_uuid) = l.split("\t")[0:2] + if EXTID_TYPE == "sha1": + assert len(row_extid) == 40 + else: + raise Exception(f"extid type not supported yet: {EXTID_TYPE}") + row_id = uuid2fcid(row_uuid) + if row_extid == last_extid: + dupe_ids.append(row_id) + continue + elif dupe_ids: + print_group(last_extid, dupe_ids) + last_extid = row_extid + dupe_ids = [row_id] + if last_extid and dupe_ids: + print_group(last_extid, dupe_ids) + + +if __name__=="__main__": + run() -- cgit v1.2.3 From 112c41a1157862d2c8f758eac685b0b26c921797 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 19:29:49 -0800 Subject: file merges: fixes from testing in QA --- python/fatcat_tools/mergers/files.py | 37 ++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py index 32c7fcb6..30b31330 100644 --- a/python/fatcat_tools/mergers/files.py +++ b/python/fatcat_tools/mergers/files.py @@ -24,9 +24,12 @@ class FileMerger(EntityMerger): def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get("editgroup_description", "Automated merge of file entities") - eg_extra = kwargs.get("editgroup_extra", dict()) + eg_desc = ( + kwargs.pop("editgroup_description", None) or "Automated merge of file entities" + ) + eg_extra = kwargs.pop("editgroup_extra", dict()) eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger") + self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False) super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) self.entity_type_name = "file" @@ -130,7 +133,7 @@ class FileMerger(EntityMerger): if entities[ident].state != "active": self.counts["skip-not-active-entity"] += 1 return 0 - if getattr(entities[ident].ext_ids, evidence["extid_type"]) != evidence["extid"]: + if getattr(entities[ident], evidence["extid_type"]) != evidence["extid"]: self.counts["skip-extid-mismatch"] += 1 return 0 @@ -138,7 +141,6 @@ class FileMerger(EntityMerger): primary_id = self.choose_primary_file(list(entities.values())) dupe_ids = [d for d in dupe_ids if d != primary_id] - # ensure primary is not in dupes assert primary_id not in dupe_ids primary = entities[primary_id] @@ -146,25 +148,32 @@ class FileMerger(EntityMerger): for other_id in dupe_ids: other = entities[other_id] primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated - self.api.update_file( - eg_id, - other.ident, - FileEntity( - redirect=primary.ident, - edit_extra=evidence, - ), - ) + if not self.dry_run_mode: + self.api.update_file( + eg_id, + other.ident, + FileEntity( + redirect=primary.ident, + edit_extra=evidence, + ), + ) updated_entities += 1 if primary_updated: - self.api.update_file(eg_id, primary.ident, primary) + if not self.dry_run_mode: + self.api.update_file(eg_id, primary.ident, primary) updated_entities += 1 return updated_entities def run_merge_files(args: argparse.Namespace) -> None: - em = FileMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + em = FileMerger( + args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run, + editgroup_description=args.editgroup_description_override, + ) JsonLinePusher(em, args.json_file).run() -- cgit v1.2.3 From 8debf771c540b0bef7f4745195f8af87490917b0 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 23 Nov 2021 19:30:14 -0800 Subject: release merger: some progress, but also disable (not complete) --- python/fatcat_tools/mergers/releases.py | 84 ++++++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 12 deletions(-) diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py index fc970057..1f995b00 100644 --- a/python/fatcat_tools/mergers/releases.py +++ b/python/fatcat_tools/mergers/releases.py @@ -15,32 +15,83 @@ from .common import EntityMerger class ReleaseMerger(EntityMerger): """ Hard merges a set of release entities, redirecting all entities to a single - primary release. + primary release. This is different from "grouping" multiple releases under + a single work. - Will also redirect works (if appropriate), and re-point {files, filesets, - webcaptures} to the new merged release. + A "primary" (which the other releases will redirect to) can be provided, or + one will be chosen from the set of duplicate releases based on the + completeness of metadata and other heuristic factors. + + Releases are some of the most complex entities to merge, because of + the complexity of bibliographic metadata and the number of related entities + which also need to be updated. + + File, Fileset, and Webcapture entities which currently point to a release + which gets redirected will be updated to point at the "primary" release. + + Any Work entities which will end up with no releases pointing at them after + the merging will get redirected to the work corresponding to the "primary" + release. + + NOTE: the "primary" release will currently (as implemented) *not* get + updated with metadata from all the redirected releases """ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get("editgroup_description", "Automated merge of release entities") + eg_desc = ( + kwargs.pop("editgroup_description", None) or "Automated merge of release entities" + ) eg_extra = kwargs.get("editgroup_extra", dict()) eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger") + self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False) super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) self.entity_type_name = "release" + def choose_primary_release( + self, entities: List[ReleaseEntity], existing_redirects: Dict[str, List[str]] + ) -> str: + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # number of entities already redirected to this one + len(existing_redirects[a.ident]), + # number of file/fileset/webcapture entities (would need to update) + int(len(a.files or []) + len(a.filesets or []) + len(a.webcaptures or [])), + # has a strong identifier? + bool(a.ext_id.doi or a.ext_id.pmid or a.ext_id.arxiv_id), + # has any identifier? + bool(a.ext_id), + # has basic metadata? + bool(a.release_type), + bool(a.release_status), + bool(a.release_year), + bool(a.container_id), + # has refs, abstracts, extra stuff? + bool(a.refs), + bool(a.abstracts), + ), + reverse=True, + ) + return entities[0].ident + def try_merge( self, dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None, ) -> int: - """ - XXX: review/refactor; this code is very old - """ + + # TODO: this code is pretty old and has only been partially refactored. + # Needs more testing and review. + raise NotImplementedError updated_entities = 0 releases = dict() + existing_redirects: Dict[str, List[str]] = dict() eg_id = self.get_editgroup_id() all_ids = dupe_ids.copy() @@ -48,12 +99,16 @@ class ReleaseMerger(EntityMerger): all_ids.append(primary_id) for ident in all_ids: releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures") + existing_redirects[ident] = self.api.get_release_redirects(ident) if not primary_id: - # XXX: - primary_id = dupe_ids[0] + primary_id = self.choose_primary_release( + list(releases.values()), existing_redirects + ) dupe_ids = [d for d in dupe_ids if d != primary_id] + assert primary_id not in dupe_ids + primary_work_id = releases[primary_id].work_id updated_work_ids = [] redirected_release_ids = [] @@ -65,6 +120,7 @@ class ReleaseMerger(EntityMerger): # file redirects for e in release.files: + assert release.ident in e.release_ids e.release_ids.remove(release.ident) if primary_id not in e.release_ids: e.release_ids.append(primary_id) @@ -75,6 +131,7 @@ class ReleaseMerger(EntityMerger): # fileset redirects for e in release.filesets: + assert release.ident in e.release_ids e.release_ids.remove(release.ident) if primary_id not in e.release_ids: e.release_ids.append(primary_id) @@ -85,6 +142,7 @@ class ReleaseMerger(EntityMerger): # webcapture redirects for e in release.webcaptures: + assert release.ident in e.release_ids e.release_ids.remove(release.ident) if primary_id not in e.release_ids: e.release_ids.append(primary_id) @@ -93,12 +151,14 @@ class ReleaseMerger(EntityMerger): updated_entities += 1 self.counts["updated-webcaptures"] += 1 - # release redirect itself + # the release redirect itself updated_work_ids.append(release.work_id) redirected_release_ids.append(release.ident) if not self.dry_run_mode: self.api.update_release( - eg_id, release.ident, ReleaseEntity(redirect=primary_id) + eg_id, + release.ident, + ReleaseEntity(redirect=primary_id, edit_extra=evidence), ) updated_entities += 1 self.counts["updated-releases"] += 1 @@ -108,7 +168,7 @@ class ReleaseMerger(EntityMerger): updated_work_ids = list(set(updated_work_ids)) assert primary_work_id not in updated_work_ids for work_id in updated_work_ids: - work_releases = self.api.get_work_releases(work_id) + work_releases = self.api.get_work_releases(work_id, hide="abstracts,refs") rids = set([r.ident for r in work_releases]) if rids.issubset(redirected_release_ids): # all the releases for this work were updated/merged; we should -- cgit v1.2.3 From 9588a6a1aa01900b0dc376981635db8b609919f5 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 24 Nov 2021 14:29:08 -0800 Subject: file merger: add content_scope to list of merged fields --- python/fatcat_tools/mergers/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py index 30b31330..3efe85fd 100644 --- a/python/fatcat_tools/mergers/files.py +++ b/python/fatcat_tools/mergers/files.py @@ -70,7 +70,7 @@ class FileMerger(EntityMerger): """ updated = False # NOTE: intentionally not including sha1 here - for k in ["size", "mimetype", "sha256", "md5"]: + for k in ["size", "mimetype", "sha256", "md5", "content_scope"]: if not getattr(primary, k) and getattr(other, k): setattr(primary, k, getattr(other, k)) updated = True -- cgit v1.2.3 From a367c366852eaaeeecd4c6142ebac187df7dcf71 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 24 Nov 2021 15:19:26 -0800 Subject: merger proposal typos --- proposals/2021-11-17_entity_mergers.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proposals/2021-11-17_entity_mergers.md b/proposals/2021-11-17_entity_mergers.md index 6c41ca58..d196d549 100644 --- a/proposals/2021-11-17_entity_mergers.md +++ b/proposals/2021-11-17_entity_mergers.md @@ -6,7 +6,7 @@ Entity Mergers One category of type of catalog metadata cleanup is merging multiple duplicate entries into a single record. The fatcat catalog allows this via during the -duplicate entities into "redirect reccords" which point at the single merged +duplicate entities into "redirect records" which point at the single merged record. This proposal briefly describes the process for doing bulk merges. @@ -78,7 +78,7 @@ be merged (redirected) to the primary release's work entity. "Grouping" releases is the same as merging their works. In this situation, the number of distinct release entities stays the same, but the duplicates are updated to be under the same work as the primary. This is initially implemented -by merging the work entites, and then updating *all* the releases under each +by merging the work entities, and then updating *all* the releases under each merged work towards the primary work identifier. No artifact entities need to be updated in this scenario. -- cgit v1.2.3 From 62bf6202d002e96ffd81d7e4634502be1886d5c3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 24 Nov 2021 16:26:06 -0800 Subject: mergers common: remove inaccurate comment Caught in review, thanks miku --- python/fatcat_tools/mergers/common.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 05433748..e25f8194 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -34,8 +34,6 @@ class EntityMerger(EntityImporter): # implemented per-task try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None - - This class is pretty similar to EntityImporter, but isn't subclassed. """ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: -- cgit v1.2.3