diff options
author | bnewbold <bnewbold@archive.org> | 2021-11-25 00:36:34 +0000 |
---|---|---|
committer | bnewbold <bnewbold@archive.org> | 2021-11-25 00:36:34 +0000 |
commit | 5bc5eeed5e3ba54c2129c4233b881291c5fa7449 (patch) | |
tree | 88392dace6857836cab80f3ea6d0980c20a24376 | |
parent | b14fca89f41b5ba2b85bf033844da211fa5c3c8b (diff) | |
parent | 62bf6202d002e96ffd81d7e4634502be1886d5c3 (diff) | |
download | fatcat-5bc5eeed5e3ba54c2129c4233b881291c5fa7449.tar.gz fatcat-5bc5eeed5e3ba54c2129c4233b881291c5fa7449.zip |
Merge branch 'bnewbold-mergers' into 'master'
entity mergers framework
See merge request webgroup/fatcat!133
-rw-r--r-- | notes/cleanups/file_sha1_dedupe.md | 64 | ||||
-rwxr-xr-x | notes/cleanups/scripts/file_dupe_to_json.py | 72 | ||||
-rw-r--r-- | proposals/2021-11-17_entity_mergers.md | 110 | ||||
-rw-r--r-- | python/fatcat_tools/mergers/__init__.py | 1 | ||||
-rw-r--r-- | python/fatcat_tools/mergers/common.py | 153 | ||||
-rw-r--r-- | python/fatcat_tools/mergers/files.py | 237 | ||||
-rw-r--r-- | python/fatcat_tools/mergers/releases.py | 249 | ||||
-rw-r--r-- | python/tests/merge_files.py | 160 |
8 files changed, 1046 insertions, 0 deletions
diff --git a/notes/cleanups/file_sha1_dedupe.md b/notes/cleanups/file_sha1_dedupe.md new file mode 100644 index 00000000..0829bc79 --- /dev/null +++ b/notes/cleanups/file_sha1_dedupe.md @@ -0,0 +1,64 @@ + + +## Prep + +Using `check_hashes.sh`: + + zcat $HASH_FILE \ + | awk '{print $3 "\t" $1}' \ + | rg -v '^\t' \ + | sort -S 4G \ + | uniq -D -w 40 \ + > sha1_ident.dupes.tsv + + wc -l sha1_ident.dupes.tsv + # 6,350 + + cut -f1 sha1_ident.dupes.tsv | uniq | wc -l + # 2,039 + +Want to create JSON for each group, like: + + entity_type: "file" + primary_id: str or None + duplicate_ids: [str] + evidence: + extid: str + extid_type: "sha1" + +Run transform script: + + cat sha1_ident.dupes.tsv | ./file_dupe_to_json.py | pv -l > file_sha1_dupes.json + # 2.04k 0:00:00 [9.16k/s] + + +## QA Testing + + export FATCAT_AUTH_API_TOKEN=[...] + + head -n25 /srv/fatcat/datasets/file_sha1_dupes.json \ + | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" --dry-run merge-files - + +Hit some small bugs running in QA; test coverage isn't great, but I think hits +the important parts. + + head -n25 /srv/fatcat/datasets/file_sha1_dupes.json \ + | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" --dry-run merge-files - + # Running in dry-run mode! + # Counter({'updated-entities': 60, 'lines': 25, 'merged': 25, 'skip': 0, 'updated-total': 0}) + +Dry-run mode didn't actually work, and edits actually happened (!). + +Edits do look good. + +Try again, not dry-run, to ensure that case is handled: + + head -n25 /srv/fatcat/datasets/file_sha1_dupes.json | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" merge-files - + # Counter({'lines': 25, 'skip': 25, 'skip-not-active-entity': 25, 'merged': 0, 'updated-total': 0}) + +And then run 500 through for more testing: + + head -n500 /srv/fatcat/datasets/file_sha1_dupes.json | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" merge-files - + # Counter({'updated-entities': 1341, 'lines': 500, 'merged': 474, 'skip': 26, 'skip-not-active-entity': 25, 'skip-entity-not-found': 1, 'updated-total': 0}) + +The majority of merges seem to be cases where there are multiple articles in the same PDF. diff --git a/notes/cleanups/scripts/file_dupe_to_json.py b/notes/cleanups/scripts/file_dupe_to_json.py new file mode 100755 index 00000000..2064dc1c --- /dev/null +++ b/notes/cleanups/scripts/file_dupe_to_json.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +""" +This script can be used to transform duplicate file entity hash export rows +into JSON objects which can be passed to the file entity merger. + +The input is expected to be a TSV with two columns: a hash value in the first +column, and a fatcat file entity ident (in UUID format, not "fatcat ident" +encoded) in the second column. The rows are assumed to be sorted by hash value +(the first column), and duplicate values (same hash, differing UUID) are +contiguous. + +File hashes aren't really "external identifiers" (ext_id), but we treat them as +such here. + +Script is pretty simple, should be possible to copy and reuse for release, +container, creator entity duplicates. +""" + +import json, sys +from typing import Optional +import base64, uuid + +EXTID_TYPE = "sha1" + +def uuid2fcid(s: str) -> str: + """ + Converts a uuid.UUID object to a fatcat identifier (base32 encoded string) + """ + raw = uuid.UUID(s).bytes + return base64.b32encode(raw)[:26].lower().decode("utf-8") + +def print_group(extid, dupe_ids): + if len(dupe_ids) < 2: + return + group = dict( + entity_type="file", + primary_id=None, + duplicate_ids=dupe_ids, + evidence=dict( + extid=extid, + extid_type=EXTID_TYPE, + ), + ) + print(json.dumps(group, sort_keys=True)) + +def run(): + last_extid = None + dupe_ids = [] + for l in sys.stdin: + l = l.strip() + if not l: + continue + (row_extid, row_uuid) = l.split("\t")[0:2] + if EXTID_TYPE == "sha1": + assert len(row_extid) == 40 + else: + raise Exception(f"extid type not supported yet: {EXTID_TYPE}") + row_id = uuid2fcid(row_uuid) + if row_extid == last_extid: + dupe_ids.append(row_id) + continue + elif dupe_ids: + print_group(last_extid, dupe_ids) + last_extid = row_extid + dupe_ids = [row_id] + if last_extid and dupe_ids: + print_group(last_extid, dupe_ids) + + +if __name__=="__main__": + run() diff --git a/proposals/2021-11-17_entity_mergers.md b/proposals/2021-11-17_entity_mergers.md new file mode 100644 index 00000000..d196d549 --- /dev/null +++ b/proposals/2021-11-17_entity_mergers.md @@ -0,0 +1,110 @@ + +status: implemented + +Entity Mergers +=============== + +One category of type of catalog metadata cleanup is merging multiple duplicate +entries into a single record. The fatcat catalog allows this via during the +duplicate entities into "redirect records" which point at the single merged +record. + +This proposal briefly describes the process for doing bulk merges. + + +## External Identifier Duplicates + +The easiest category of entity duplicates to discover is cases where multiple +entities have the same external (persistent) identifier. For example, releases +with the exact same DOI, containers with the same ISSN-L, or creators with the +same ORCiD. Files with the same SHA-1 hash is a similar issue. The catalog does +not block the creation of such entities, though it is assumed that editors and +bots will do their best to prevent creating duplicates, and that this is +checked and monitored via review bots (auto-annotation) and bulk quality +checks. + +In these cases, it is simple enough to use the external identifier dumps (part +of the fatcat bulk exports), find duplicates by identifier, and create merge +requests. + + +## Merge Requests JSON Schema + +Proposed JSON schema for bulk entity merging: + + entity_type: str, required. eg: "file" + primary_id: str, optional, entity ident + duplicate_ids: [str], required, entity idents + evidence: dict, optional, merger/entity specific + # evidence fields for external identifier dupes + extid: str, the identifier value + extid_type: str, eg "doi" or "sha1" + +The merge request generation process might indicate which of the entities +should end up as the "primary", or it might leave that determination to the +merger itself. `primary_id` should not be set arbitrarily or randomly if there +is not a good reason for a specific entity to be the "primary" which others +redirect to. + +The `primary_id` should not be included in `duplicate_ids`, but the merger code +will remove it if included accidentally. + +The `evidence` fields are flexible. By default they will all be included as +top-level "edit extra" metadata on each individual entity redirected, but not +on the primary entity (if it gets updated). + + +## Merge Process and Semantics + +The assumption is that all the entities indicated in `duplicate_ids` will be +redirected to the `primary_id`. Any metadata included in the duplicates which +is not included in the primary will be copied in to the primary, but existing +primary metadata fields will not be "clobbered" (overwritten) by duplicate +metadata. This includes top-level fields of the `extra` metadata dict, if +appropriate. If there is no unique metadata in the redirected entities, the +primary does not need to be updated and will not be. + + +## Work/Release Grouping and Merging + +Work and Release entities are something of a special case. + +Merging two release entities will result in all artifact entities (files, +filesets, webcaptures) being updated which previously pointed at the duplicate +entity to point to the primary entity. If the work entities associated with the +duplicate releases have no other releases associated with them, they also will +be merged (redirected) to the primary release's work entity. + +"Grouping" releases is the same as merging their works. In this situation, the +number of distinct release entities stays the same, but the duplicates are +updated to be under the same work as the primary. This is initially implemented +by merging the work entities, and then updating *all* the releases under each +merged work towards the primary work identifier. No artifact entities need to +be updated in this scenario. + +A currently planned option would be to pull a single release out of a group of +releases under a work, and point it to a new work. This would be a form of +"regrouping". For now this can only be achieved by updating the release +entities individually, not in a bulk/automated manner. + + +## Container Merging + +Because many releases point to containers, it is not practical to update all +the releases at the same time as merging the containers. In the long run it is +good for the health of the catalog to have all the releases updated to point at +the the primary container, but these updates can be delayed. + +To keep statistics and functionality working before release updates happen, +downstream users of release entities should "expand" container sub-entities and +use the "redirect" ident of the container entity instead of "ident", if the +"redirect" is set. For example, when linking in web interfaces, or when doing a +schema transform in the fatcat and scholar.archive.org search index. + + +## Background Reading + +"The Lens MetaRecord and LensID: An open identifier system for aggregated +metadata and versioning of knowledge artefacts" +https://osf.io/preprints/lissa/t56yh/ + diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py new file mode 100644 index 00000000..0d7cd468 --- /dev/null +++ b/python/fatcat_tools/mergers/__init__.py @@ -0,0 +1 @@ +from .common import EntityMerger diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..e25f8194 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,153 @@ +""" +Tools for merging entities in various ways. + + merge-releases: merge release entities together + => groups files/filesets/webcaptures + => merges work entities if needed + merge-works: pull all release entities under a single work + => merges work entities + merge-containers: merge container entities + merge-files: merge file entities +""" + +import subprocess +from collections import Counter +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client + +from fatcat_tools.importers import EntityImporter + + +class EntityMerger(EntityImporter): + """ + API for individual jobs: + + # record iterators sees + push_record(raw_record) + finish() + + # provided helpers + self.api + self.get_editgroup_id() + counts({'lines', 'skip', 'merged', 'updated'}) + + # implemented per-task + try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger") + + self.api = api + self.dry_run_mode = kwargs.get("dry_run_mode", True) + self.edit_batch_size = kwargs.get("edit_batch_size", 50) + self.editgroup_description = kwargs.get("editgroup_description") + self.editgroup_extra = eg_extra + self.reset() + self.entity_type_name = "common" + + if self.dry_run_mode: + print("Running in dry-run mode!") + + def reset(self) -> None: + self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0}) + self._edit_count = 0 + self._editgroup_id: Optional[str] = None + self._idents_inflight: List[str] = [] + + def push_record(self, record: Dict[str, Any]) -> None: + """ + Intended to be called by "pusher" class (which could be pulling from + JSON file, Kafka, whatever). + + Input is expected to be a dict-like object with keys: + + entity_type: str + primary_id: Optional[str] + duplicate_ids: [str] (order not preserved) + evidence: Optional[dict] + # can be anything, entity- or merger-specific + # some variables might be... + extid: str + extid_type: str + + Returns nothing. + """ + self.counts["lines"] += 1 + if not record: + self.counts["skip-blank-line"] += 1 + return + if record.get("entity_type") != self.entity_type_name: + self.counts["skip-entity-type"] += 1 + return + primary_id: Optional[str] = record.get("primary_id") + duplicate_ids: List[str] = list(set(record["duplicate_ids"])) + duplicate_ids = [di for di in duplicate_ids if di != primary_id] + if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id): + self.counts["skip-no-dupes"] += 1 + return + all_ids = duplicate_ids + if primary_id: + all_ids.append(primary_id) + for i in all_ids: + if i in self._idents_inflight: + raise ValueError( + "Entity already part of in-process merge operation: {}".format(i) + ) + self._idents_inflight.append(i) + count = self.try_merge( + duplicate_ids, primary_id=primary_id, evidence=record.get("evidence") + ) + if count: + self.counts["merged"] += 1 + self.counts["updated-entities"] += count + self._edit_count += count + else: + self.counts["skip"] += 1 + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + return + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + # implementations should fill this in + raise NotImplementedError + + def finish(self) -> Counter: + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + + return self.counts + + def get_editgroup_id(self, _edits: int = 1) -> str: + """ + This version of get_editgroup_id() is similar to the EntityImporter + version, but does not update self._edit_count or submit editgroups. The + edits parameter is ignored. + """ + + if not self._editgroup_id: + eg = self.api.create_editgroup( + fatcat_openapi_client.Editgroup( + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) + self._editgroup_id = eg.editgroup_id + assert self._editgroup_id + return self._editgroup_id diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py new file mode 100644 index 00000000..3efe85fd --- /dev/null +++ b/python/fatcat_tools/mergers/files.py @@ -0,0 +1,237 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import FileEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher + +from .common import EntityMerger + + +class FileMerger(EntityMerger): + """ + Combines file entities into a single primary. Merges any existing partial + metadata (such as release_ids and URLs). Can choose a primary if necessary. + + The primary is only updated if needed. + + TODO: relies on API server to detect "redirect of redirect" situation + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_desc = ( + kwargs.pop("editgroup_description", None) or "Automated merge of file entities" + ) + eg_extra = kwargs.pop("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger") + self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False) + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "file" + + def choose_primary_file(self, entities: List[FileEntity]) -> str: + """ + TODO: could incorporate number of redirected entities already pointing at an entity + """ + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # has complete metadata? + bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)), + # has releases associated? + bool(a.release_ids), + # has URLs? + bool(a.urls), + # has extra metadata? + bool(a.extra), + # number of release_ids + len(a.release_ids or []), + ), + reverse=True, + ) + return entities[0].ident + + def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool: + """ + Compares a primary to an other. If there are helpful metadata fields in + the other, copy them to primary, in-place. + + This is intended to extract any useful metadata from "other" before it + gets redirected to "primary". + + Returns True if the primary was updated, False otherwise. + """ + updated = False + # NOTE: intentionally not including sha1 here + for k in ["size", "mimetype", "sha256", "md5", "content_scope"]: + if not getattr(primary, k) and getattr(other, k): + setattr(primary, k, getattr(other, k)) + updated = True + + if not primary.urls: + primary.urls = [] + if not primary.release_ids: + primary.release_ids = [] + + if other.extra: + if not primary.extra: + primary.extra = other.extra + updated = True + else: + for k in other.extra.keys(): + if k not in primary.extra: + primary.extra[k] = other.extra[k] + updated = True + + for u in other.urls or []: + if u not in primary.urls: + primary.urls.append(u) + updated = True + + for i in other.release_ids or []: + if i not in primary.release_ids: + primary.release_ids.append(i) + updated = True + + return updated + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + + # currently required for extid validation + if not evidence or not (evidence.get("extid_type") and evidence.get("extid")): + self.counts["skip-missing-evidence"] += 1 + return 0 + + updated_entities = 0 + entities: Dict[str, FileEntity] = dict() + eg_id = self.get_editgroup_id() + + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + try: + entities[ident] = self.api.get_file(ident) + except fatcat_openapi_client.ApiException as ae: + if ae.status == 404: + self.counts["skip-entity-not-found"] += 1 + return 0 + else: + raise + if entities[ident].state != "active": + self.counts["skip-not-active-entity"] += 1 + return 0 + if getattr(entities[ident], evidence["extid_type"]) != evidence["extid"]: + self.counts["skip-extid-mismatch"] += 1 + return 0 + + if not primary_id: + primary_id = self.choose_primary_file(list(entities.values())) + dupe_ids = [d for d in dupe_ids if d != primary_id] + + assert primary_id not in dupe_ids + + primary = entities[primary_id] + primary_updated = False + for other_id in dupe_ids: + other = entities[other_id] + primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated + if not self.dry_run_mode: + self.api.update_file( + eg_id, + other.ident, + FileEntity( + redirect=primary.ident, + edit_extra=evidence, + ), + ) + updated_entities += 1 + + if primary_updated: + if not self.dry_run_mode: + self.api.update_file(eg_id, primary.ident, primary) + updated_entities += 1 + + return updated_entities + + +def run_merge_files(args: argparse.Namespace) -> None: + em = FileMerger( + args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run, + editgroup_description=args.editgroup_description_override, + ) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.files [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_files = subparsers.add_parser("merge-files") + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py new file mode 100644 index 00000000..1f995b00 --- /dev/null +++ b/python/fatcat_tools/mergers/releases.py @@ -0,0 +1,249 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import ReleaseEntity, WorkEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher + +from .common import EntityMerger + + +class ReleaseMerger(EntityMerger): + """ + Hard merges a set of release entities, redirecting all entities to a single + primary release. This is different from "grouping" multiple releases under + a single work. + + A "primary" (which the other releases will redirect to) can be provided, or + one will be chosen from the set of duplicate releases based on the + completeness of metadata and other heuristic factors. + + Releases are some of the most complex entities to merge, because of + the complexity of bibliographic metadata and the number of related entities + which also need to be updated. + + File, Fileset, and Webcapture entities which currently point to a release + which gets redirected will be updated to point at the "primary" release. + + Any Work entities which will end up with no releases pointing at them after + the merging will get redirected to the work corresponding to the "primary" + release. + + NOTE: the "primary" release will currently (as implemented) *not* get + updated with metadata from all the redirected releases + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_desc = ( + kwargs.pop("editgroup_description", None) or "Automated merge of release entities" + ) + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger") + self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False) + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "release" + + def choose_primary_release( + self, entities: List[ReleaseEntity], existing_redirects: Dict[str, List[str]] + ) -> str: + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # number of entities already redirected to this one + len(existing_redirects[a.ident]), + # number of file/fileset/webcapture entities (would need to update) + int(len(a.files or []) + len(a.filesets or []) + len(a.webcaptures or [])), + # has a strong identifier? + bool(a.ext_id.doi or a.ext_id.pmid or a.ext_id.arxiv_id), + # has any identifier? + bool(a.ext_id), + # has basic metadata? + bool(a.release_type), + bool(a.release_status), + bool(a.release_year), + bool(a.container_id), + # has refs, abstracts, extra stuff? + bool(a.refs), + bool(a.abstracts), + ), + reverse=True, + ) + return entities[0].ident + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + + # TODO: this code is pretty old and has only been partially refactored. + # Needs more testing and review. + raise NotImplementedError + + updated_entities = 0 + releases = dict() + existing_redirects: Dict[str, List[str]] = dict() + eg_id = self.get_editgroup_id() + + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures") + existing_redirects[ident] = self.api.get_release_redirects(ident) + + if not primary_id: + primary_id = self.choose_primary_release( + list(releases.values()), existing_redirects + ) + dupe_ids = [d for d in dupe_ids if d != primary_id] + + assert primary_id not in dupe_ids + + primary_work_id = releases[primary_id].work_id + updated_work_ids = [] + redirected_release_ids = [] + + # execute all the release redirects + for release in releases.values(): + if release.ident == primary_id: + continue + + # file redirects + for e in release.files: + assert release.ident in e.release_ids + e.release_ids.remove(release.ident) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) + if not self.dry_run_mode: + self.api.update_file(eg_id, e.ident, e) + updated_entities += 1 + self.counts["updated-files"] += 1 + + # fileset redirects + for e in release.filesets: + assert release.ident in e.release_ids + e.release_ids.remove(release.ident) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) + if not self.dry_run_mode: + self.api.update_fileset(eg_id, e.ident, e) + updated_entities += 1 + self.counts["updated-filesets"] += 1 + + # webcapture redirects + for e in release.webcaptures: + assert release.ident in e.release_ids + e.release_ids.remove(release.ident) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) + if not self.dry_run_mode: + self.api.update_webcapture(eg_id, e.ident, e) + updated_entities += 1 + self.counts["updated-webcaptures"] += 1 + + # the release redirect itself + updated_work_ids.append(release.work_id) + redirected_release_ids.append(release.ident) + if not self.dry_run_mode: + self.api.update_release( + eg_id, + release.ident, + ReleaseEntity(redirect=primary_id, edit_extra=evidence), + ) + updated_entities += 1 + self.counts["updated-releases"] += 1 + + # lastly, clean up any merged work entities + redirected_release_ids = list(set(redirected_release_ids)) + updated_work_ids = list(set(updated_work_ids)) + assert primary_work_id not in updated_work_ids + for work_id in updated_work_ids: + work_releases = self.api.get_work_releases(work_id, hide="abstracts,refs") + rids = set([r.ident for r in work_releases]) + if rids.issubset(redirected_release_ids): + # all the releases for this work were updated/merged; we should + # redirect to primary work_id + # also check we don't have any in-flight edit conflicts + assert work_id not in self._idents_inflight + self._idents_inflight.append(work_id) + if not self.dry_run_mode: + self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + updated_entities += 1 + self.counts["updated-works"] += 1 + + return updated_entities + + +def run_merge_releases(args: argparse.Namespace) -> None: + em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.releases [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_releases = subparsers.add_parser("merge-releases") + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/python/tests/merge_files.py b/python/tests/merge_files.py new file mode 100644 index 00000000..c33c6f6c --- /dev/null +++ b/python/tests/merge_files.py @@ -0,0 +1,160 @@ +from fatcat_openapi_client import FileEntity, FileUrl +from fixtures import api + +from fatcat_tools.mergers.files import FileMerger + + +def test_choose_primary_file(api) -> None: + + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + + assert fm.choose_primary_file([fe_partial, fe_norelease]) == "bbbbsb5apzfhbbxxc7rgu2yw6m" + assert ( + fm.choose_primary_file([fe_partial, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_complete, fe_nourls, fe_norelease]) + == "ddddsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_pseudo_complete, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + + +def test_merge_file_metadata_from(api) -> None: + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="ddddddd315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_another_release_id = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + ) + fe_another_url = FileEntity( + ident="zzzzzzzapzfhbbxxc7rgu2yw6m", + urls=[ + FileUrl(rel="repository", url="http://someuni.edu/repo/file.pdf"), + ], + ) + fe_more_extra = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + extra=dict(thang=456), + ) + + assert fm.merge_file_metadata_from(fe_nourls, fe_partial) is False + assert fm.merge_file_metadata_from(fe_complete, fe_pseudo_complete) is False + assert fm.merge_file_metadata_from(fe_complete, fe_complete) is False + assert fm.merge_file_metadata_from(fe_partial, fe_norelease) is True + assert fe_partial.md5 == fe_norelease.md5 + assert fe_partial.size == fe_norelease.size + assert fm.merge_file_metadata_from(fe_partial, fe_complete) is True + assert fe_partial.md5 != fe_complete.md5 + assert fe_partial.extra == fe_complete.extra + assert set([(u.rel, u.url) for u in fe_partial.urls or []]) == set( + [(u.rel, u.url) for u in fe_complete.urls or []] + ) + assert fe_partial.release_ids == fe_complete.release_ids + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is True + assert fe_partial.release_ids == [ + "dlrxjg7mxrayxfltget7fqcrjy", + "qqqqqg7mxrayxfltget7fqcrjy", + ] + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is False + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is True + assert fe_partial.extra == dict(asdf=123, thang=456) + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is False + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is True + assert fe_partial.urls[-1].url == "http://someuni.edu/repo/file.pdf" + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is False |