aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbnewbold <bnewbold@archive.org>2021-11-25 00:36:34 +0000
committerbnewbold <bnewbold@archive.org>2021-11-25 00:36:34 +0000
commit5bc5eeed5e3ba54c2129c4233b881291c5fa7449 (patch)
tree88392dace6857836cab80f3ea6d0980c20a24376
parentb14fca89f41b5ba2b85bf033844da211fa5c3c8b (diff)
parent62bf6202d002e96ffd81d7e4634502be1886d5c3 (diff)
downloadfatcat-5bc5eeed5e3ba54c2129c4233b881291c5fa7449.tar.gz
fatcat-5bc5eeed5e3ba54c2129c4233b881291c5fa7449.zip
Merge branch 'bnewbold-mergers' into 'master'
entity mergers framework See merge request webgroup/fatcat!133
-rw-r--r--notes/cleanups/file_sha1_dedupe.md64
-rwxr-xr-xnotes/cleanups/scripts/file_dupe_to_json.py72
-rw-r--r--proposals/2021-11-17_entity_mergers.md110
-rw-r--r--python/fatcat_tools/mergers/__init__.py1
-rw-r--r--python/fatcat_tools/mergers/common.py153
-rw-r--r--python/fatcat_tools/mergers/files.py237
-rw-r--r--python/fatcat_tools/mergers/releases.py249
-rw-r--r--python/tests/merge_files.py160
8 files changed, 1046 insertions, 0 deletions
diff --git a/notes/cleanups/file_sha1_dedupe.md b/notes/cleanups/file_sha1_dedupe.md
new file mode 100644
index 00000000..0829bc79
--- /dev/null
+++ b/notes/cleanups/file_sha1_dedupe.md
@@ -0,0 +1,64 @@
+
+
+## Prep
+
+Using `check_hashes.sh`:
+
+ zcat $HASH_FILE \
+ | awk '{print $3 "\t" $1}' \
+ | rg -v '^\t' \
+ | sort -S 4G \
+ | uniq -D -w 40 \
+ > sha1_ident.dupes.tsv
+
+ wc -l sha1_ident.dupes.tsv
+ # 6,350
+
+ cut -f1 sha1_ident.dupes.tsv | uniq | wc -l
+ # 2,039
+
+Want to create JSON for each group, like:
+
+ entity_type: "file"
+ primary_id: str or None
+ duplicate_ids: [str]
+ evidence:
+ extid: str
+ extid_type: "sha1"
+
+Run transform script:
+
+ cat sha1_ident.dupes.tsv | ./file_dupe_to_json.py | pv -l > file_sha1_dupes.json
+ # 2.04k 0:00:00 [9.16k/s]
+
+
+## QA Testing
+
+ export FATCAT_AUTH_API_TOKEN=[...]
+
+ head -n25 /srv/fatcat/datasets/file_sha1_dupes.json \
+ | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" --dry-run merge-files -
+
+Hit some small bugs running in QA; test coverage isn't great, but I think hits
+the important parts.
+
+ head -n25 /srv/fatcat/datasets/file_sha1_dupes.json \
+ | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" --dry-run merge-files -
+ # Running in dry-run mode!
+ # Counter({'updated-entities': 60, 'lines': 25, 'merged': 25, 'skip': 0, 'updated-total': 0})
+
+Dry-run mode didn't actually work, and edits actually happened (!).
+
+Edits do look good.
+
+Try again, not dry-run, to ensure that case is handled:
+
+ head -n25 /srv/fatcat/datasets/file_sha1_dupes.json | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" merge-files -
+ # Counter({'lines': 25, 'skip': 25, 'skip-not-active-entity': 25, 'merged': 0, 'updated-total': 0})
+
+And then run 500 through for more testing:
+
+ head -n500 /srv/fatcat/datasets/file_sha1_dupes.json | python -m fatcat_tools.mergers.files --editgroup-description-override "Automated merging of file entities with duplicate SHA-1 hashes" merge-files -
+ # Counter({'updated-entities': 1341, 'lines': 500, 'merged': 474, 'skip': 26, 'skip-not-active-entity': 25, 'skip-entity-not-found': 1, 'updated-total': 0})
+
+The majority of merges seem to be cases where there are multiple articles in the same PDF.
diff --git a/notes/cleanups/scripts/file_dupe_to_json.py b/notes/cleanups/scripts/file_dupe_to_json.py
new file mode 100755
index 00000000..2064dc1c
--- /dev/null
+++ b/notes/cleanups/scripts/file_dupe_to_json.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python3
+
+"""
+This script can be used to transform duplicate file entity hash export rows
+into JSON objects which can be passed to the file entity merger.
+
+The input is expected to be a TSV with two columns: a hash value in the first
+column, and a fatcat file entity ident (in UUID format, not "fatcat ident"
+encoded) in the second column. The rows are assumed to be sorted by hash value
+(the first column), and duplicate values (same hash, differing UUID) are
+contiguous.
+
+File hashes aren't really "external identifiers" (ext_id), but we treat them as
+such here.
+
+Script is pretty simple, should be possible to copy and reuse for release,
+container, creator entity duplicates.
+"""
+
+import json, sys
+from typing import Optional
+import base64, uuid
+
+EXTID_TYPE = "sha1"
+
+def uuid2fcid(s: str) -> str:
+ """
+ Converts a uuid.UUID object to a fatcat identifier (base32 encoded string)
+ """
+ raw = uuid.UUID(s).bytes
+ return base64.b32encode(raw)[:26].lower().decode("utf-8")
+
+def print_group(extid, dupe_ids):
+ if len(dupe_ids) < 2:
+ return
+ group = dict(
+ entity_type="file",
+ primary_id=None,
+ duplicate_ids=dupe_ids,
+ evidence=dict(
+ extid=extid,
+ extid_type=EXTID_TYPE,
+ ),
+ )
+ print(json.dumps(group, sort_keys=True))
+
+def run():
+ last_extid = None
+ dupe_ids = []
+ for l in sys.stdin:
+ l = l.strip()
+ if not l:
+ continue
+ (row_extid, row_uuid) = l.split("\t")[0:2]
+ if EXTID_TYPE == "sha1":
+ assert len(row_extid) == 40
+ else:
+ raise Exception(f"extid type not supported yet: {EXTID_TYPE}")
+ row_id = uuid2fcid(row_uuid)
+ if row_extid == last_extid:
+ dupe_ids.append(row_id)
+ continue
+ elif dupe_ids:
+ print_group(last_extid, dupe_ids)
+ last_extid = row_extid
+ dupe_ids = [row_id]
+ if last_extid and dupe_ids:
+ print_group(last_extid, dupe_ids)
+
+
+if __name__=="__main__":
+ run()
diff --git a/proposals/2021-11-17_entity_mergers.md b/proposals/2021-11-17_entity_mergers.md
new file mode 100644
index 00000000..d196d549
--- /dev/null
+++ b/proposals/2021-11-17_entity_mergers.md
@@ -0,0 +1,110 @@
+
+status: implemented
+
+Entity Mergers
+===============
+
+One category of type of catalog metadata cleanup is merging multiple duplicate
+entries into a single record. The fatcat catalog allows this via during the
+duplicate entities into "redirect records" which point at the single merged
+record.
+
+This proposal briefly describes the process for doing bulk merges.
+
+
+## External Identifier Duplicates
+
+The easiest category of entity duplicates to discover is cases where multiple
+entities have the same external (persistent) identifier. For example, releases
+with the exact same DOI, containers with the same ISSN-L, or creators with the
+same ORCiD. Files with the same SHA-1 hash is a similar issue. The catalog does
+not block the creation of such entities, though it is assumed that editors and
+bots will do their best to prevent creating duplicates, and that this is
+checked and monitored via review bots (auto-annotation) and bulk quality
+checks.
+
+In these cases, it is simple enough to use the external identifier dumps (part
+of the fatcat bulk exports), find duplicates by identifier, and create merge
+requests.
+
+
+## Merge Requests JSON Schema
+
+Proposed JSON schema for bulk entity merging:
+
+ entity_type: str, required. eg: "file"
+ primary_id: str, optional, entity ident
+ duplicate_ids: [str], required, entity idents
+ evidence: dict, optional, merger/entity specific
+ # evidence fields for external identifier dupes
+ extid: str, the identifier value
+ extid_type: str, eg "doi" or "sha1"
+
+The merge request generation process might indicate which of the entities
+should end up as the "primary", or it might leave that determination to the
+merger itself. `primary_id` should not be set arbitrarily or randomly if there
+is not a good reason for a specific entity to be the "primary" which others
+redirect to.
+
+The `primary_id` should not be included in `duplicate_ids`, but the merger code
+will remove it if included accidentally.
+
+The `evidence` fields are flexible. By default they will all be included as
+top-level "edit extra" metadata on each individual entity redirected, but not
+on the primary entity (if it gets updated).
+
+
+## Merge Process and Semantics
+
+The assumption is that all the entities indicated in `duplicate_ids` will be
+redirected to the `primary_id`. Any metadata included in the duplicates which
+is not included in the primary will be copied in to the primary, but existing
+primary metadata fields will not be "clobbered" (overwritten) by duplicate
+metadata. This includes top-level fields of the `extra` metadata dict, if
+appropriate. If there is no unique metadata in the redirected entities, the
+primary does not need to be updated and will not be.
+
+
+## Work/Release Grouping and Merging
+
+Work and Release entities are something of a special case.
+
+Merging two release entities will result in all artifact entities (files,
+filesets, webcaptures) being updated which previously pointed at the duplicate
+entity to point to the primary entity. If the work entities associated with the
+duplicate releases have no other releases associated with them, they also will
+be merged (redirected) to the primary release's work entity.
+
+"Grouping" releases is the same as merging their works. In this situation, the
+number of distinct release entities stays the same, but the duplicates are
+updated to be under the same work as the primary. This is initially implemented
+by merging the work entities, and then updating *all* the releases under each
+merged work towards the primary work identifier. No artifact entities need to
+be updated in this scenario.
+
+A currently planned option would be to pull a single release out of a group of
+releases under a work, and point it to a new work. This would be a form of
+"regrouping". For now this can only be achieved by updating the release
+entities individually, not in a bulk/automated manner.
+
+
+## Container Merging
+
+Because many releases point to containers, it is not practical to update all
+the releases at the same time as merging the containers. In the long run it is
+good for the health of the catalog to have all the releases updated to point at
+the the primary container, but these updates can be delayed.
+
+To keep statistics and functionality working before release updates happen,
+downstream users of release entities should "expand" container sub-entities and
+use the "redirect" ident of the container entity instead of "ident", if the
+"redirect" is set. For example, when linking in web interfaces, or when doing a
+schema transform in the fatcat and scholar.archive.org search index.
+
+
+## Background Reading
+
+"The Lens MetaRecord and LensID: An open identifier system for aggregated
+metadata and versioning of knowledge artefacts"
+https://osf.io/preprints/lissa/t56yh/
+
diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py
new file mode 100644
index 00000000..0d7cd468
--- /dev/null
+++ b/python/fatcat_tools/mergers/__init__.py
@@ -0,0 +1 @@
+from .common import EntityMerger
diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py
new file mode 100644
index 00000000..e25f8194
--- /dev/null
+++ b/python/fatcat_tools/mergers/common.py
@@ -0,0 +1,153 @@
+"""
+Tools for merging entities in various ways.
+
+ merge-releases: merge release entities together
+ => groups files/filesets/webcaptures
+ => merges work entities if needed
+ merge-works: pull all release entities under a single work
+ => merges work entities
+ merge-containers: merge container entities
+ merge-files: merge file entities
+"""
+
+import subprocess
+from collections import Counter
+from typing import Any, Dict, List, Optional
+
+import fatcat_openapi_client
+
+from fatcat_tools.importers import EntityImporter
+
+
+class EntityMerger(EntityImporter):
+ """
+ API for individual jobs:
+
+ # record iterators sees
+ push_record(raw_record)
+ finish()
+
+ # provided helpers
+ self.api
+ self.get_editgroup_id()
+ counts({'lines', 'skip', 'merged', 'updated'})
+
+ # implemented per-task
+ try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None
+ """
+
+ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
+
+ eg_extra = kwargs.get("editgroup_extra", dict())
+ eg_extra["git_rev"] = eg_extra.get(
+ "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip()
+ ).decode("utf-8")
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger")
+
+ self.api = api
+ self.dry_run_mode = kwargs.get("dry_run_mode", True)
+ self.edit_batch_size = kwargs.get("edit_batch_size", 50)
+ self.editgroup_description = kwargs.get("editgroup_description")
+ self.editgroup_extra = eg_extra
+ self.reset()
+ self.entity_type_name = "common"
+
+ if self.dry_run_mode:
+ print("Running in dry-run mode!")
+
+ def reset(self) -> None:
+ self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0})
+ self._edit_count = 0
+ self._editgroup_id: Optional[str] = None
+ self._idents_inflight: List[str] = []
+
+ def push_record(self, record: Dict[str, Any]) -> None:
+ """
+ Intended to be called by "pusher" class (which could be pulling from
+ JSON file, Kafka, whatever).
+
+ Input is expected to be a dict-like object with keys:
+
+ entity_type: str
+ primary_id: Optional[str]
+ duplicate_ids: [str] (order not preserved)
+ evidence: Optional[dict]
+ # can be anything, entity- or merger-specific
+ # some variables might be...
+ extid: str
+ extid_type: str
+
+ Returns nothing.
+ """
+ self.counts["lines"] += 1
+ if not record:
+ self.counts["skip-blank-line"] += 1
+ return
+ if record.get("entity_type") != self.entity_type_name:
+ self.counts["skip-entity-type"] += 1
+ return
+ primary_id: Optional[str] = record.get("primary_id")
+ duplicate_ids: List[str] = list(set(record["duplicate_ids"]))
+ duplicate_ids = [di for di in duplicate_ids if di != primary_id]
+ if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id):
+ self.counts["skip-no-dupes"] += 1
+ return
+ all_ids = duplicate_ids
+ if primary_id:
+ all_ids.append(primary_id)
+ for i in all_ids:
+ if i in self._idents_inflight:
+ raise ValueError(
+ "Entity already part of in-process merge operation: {}".format(i)
+ )
+ self._idents_inflight.append(i)
+ count = self.try_merge(
+ duplicate_ids, primary_id=primary_id, evidence=record.get("evidence")
+ )
+ if count:
+ self.counts["merged"] += 1
+ self.counts["updated-entities"] += count
+ self._edit_count += count
+ else:
+ self.counts["skip"] += 1
+ if self._edit_count >= self.edit_batch_size:
+ self.api.accept_editgroup(self._editgroup_id)
+ self._editgroup_id = None
+ self._edit_count = 0
+ self._idents_inflight = []
+ return
+
+ def try_merge(
+ self,
+ dupe_ids: List[str],
+ primary_id: Optional[str] = None,
+ evidence: Optional[Dict[str, Any]] = None,
+ ) -> int:
+ # implementations should fill this in
+ raise NotImplementedError
+
+ def finish(self) -> Counter:
+ if self._edit_count > 0:
+ self.api.accept_editgroup(self._editgroup_id)
+ self._editgroup_id = None
+ self._edit_count = 0
+ self._idents_inflight = []
+
+ return self.counts
+
+ def get_editgroup_id(self, _edits: int = 1) -> str:
+ """
+ This version of get_editgroup_id() is similar to the EntityImporter
+ version, but does not update self._edit_count or submit editgroups. The
+ edits parameter is ignored.
+ """
+
+ if not self._editgroup_id:
+ eg = self.api.create_editgroup(
+ fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description, extra=self.editgroup_extra
+ )
+ )
+ self._editgroup_id = eg.editgroup_id
+ assert self._editgroup_id
+ return self._editgroup_id
diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py
new file mode 100644
index 00000000..3efe85fd
--- /dev/null
+++ b/python/fatcat_tools/mergers/files.py
@@ -0,0 +1,237 @@
+import argparse
+import os
+import sys
+from typing import Any, Dict, List, Optional
+
+import fatcat_openapi_client
+from fatcat_openapi_client.models import FileEntity
+
+from fatcat_tools import authenticated_api
+from fatcat_tools.importers import JsonLinePusher
+
+from .common import EntityMerger
+
+
+class FileMerger(EntityMerger):
+ """
+ Combines file entities into a single primary. Merges any existing partial
+ metadata (such as release_ids and URLs). Can choose a primary if necessary.
+
+ The primary is only updated if needed.
+
+ TODO: relies on API server to detect "redirect of redirect" situation
+ """
+
+ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
+
+ eg_desc = (
+ kwargs.pop("editgroup_description", None) or "Automated merge of file entities"
+ )
+ eg_extra = kwargs.pop("editgroup_extra", dict())
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger")
+ self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False)
+ super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
+ self.entity_type_name = "file"
+
+ def choose_primary_file(self, entities: List[FileEntity]) -> str:
+ """
+ TODO: could incorporate number of redirected entities already pointing at an entity
+ """
+ assert entities and len(entities) >= 2
+
+ # want to sort in descending order, so reverse=True
+ entities = sorted(
+ entities,
+ key=lambda a: (
+ # has complete metadata?
+ bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)),
+ # has releases associated?
+ bool(a.release_ids),
+ # has URLs?
+ bool(a.urls),
+ # has extra metadata?
+ bool(a.extra),
+ # number of release_ids
+ len(a.release_ids or []),
+ ),
+ reverse=True,
+ )
+ return entities[0].ident
+
+ def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool:
+ """
+ Compares a primary to an other. If there are helpful metadata fields in
+ the other, copy them to primary, in-place.
+
+ This is intended to extract any useful metadata from "other" before it
+ gets redirected to "primary".
+
+ Returns True if the primary was updated, False otherwise.
+ """
+ updated = False
+ # NOTE: intentionally not including sha1 here
+ for k in ["size", "mimetype", "sha256", "md5", "content_scope"]:
+ if not getattr(primary, k) and getattr(other, k):
+ setattr(primary, k, getattr(other, k))
+ updated = True
+
+ if not primary.urls:
+ primary.urls = []
+ if not primary.release_ids:
+ primary.release_ids = []
+
+ if other.extra:
+ if not primary.extra:
+ primary.extra = other.extra
+ updated = True
+ else:
+ for k in other.extra.keys():
+ if k not in primary.extra:
+ primary.extra[k] = other.extra[k]
+ updated = True
+
+ for u in other.urls or []:
+ if u not in primary.urls:
+ primary.urls.append(u)
+ updated = True
+
+ for i in other.release_ids or []:
+ if i not in primary.release_ids:
+ primary.release_ids.append(i)
+ updated = True
+
+ return updated
+
+ def try_merge(
+ self,
+ dupe_ids: List[str],
+ primary_id: Optional[str] = None,
+ evidence: Optional[Dict[str, Any]] = None,
+ ) -> int:
+
+ # currently required for extid validation
+ if not evidence or not (evidence.get("extid_type") and evidence.get("extid")):
+ self.counts["skip-missing-evidence"] += 1
+ return 0
+
+ updated_entities = 0
+ entities: Dict[str, FileEntity] = dict()
+ eg_id = self.get_editgroup_id()
+
+ all_ids = dupe_ids.copy()
+ if primary_id:
+ all_ids.append(primary_id)
+ for ident in all_ids:
+ try:
+ entities[ident] = self.api.get_file(ident)
+ except fatcat_openapi_client.ApiException as ae:
+ if ae.status == 404:
+ self.counts["skip-entity-not-found"] += 1
+ return 0
+ else:
+ raise
+ if entities[ident].state != "active":
+ self.counts["skip-not-active-entity"] += 1
+ return 0
+ if getattr(entities[ident], evidence["extid_type"]) != evidence["extid"]:
+ self.counts["skip-extid-mismatch"] += 1
+ return 0
+
+ if not primary_id:
+ primary_id = self.choose_primary_file(list(entities.values()))
+ dupe_ids = [d for d in dupe_ids if d != primary_id]
+
+ assert primary_id not in dupe_ids
+
+ primary = entities[primary_id]
+ primary_updated = False
+ for other_id in dupe_ids:
+ other = entities[other_id]
+ primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated
+ if not self.dry_run_mode:
+ self.api.update_file(
+ eg_id,
+ other.ident,
+ FileEntity(
+ redirect=primary.ident,
+ edit_extra=evidence,
+ ),
+ )
+ updated_entities += 1
+
+ if primary_updated:
+ if not self.dry_run_mode:
+ self.api.update_file(eg_id, primary.ident, primary)
+ updated_entities += 1
+
+ return updated_entities
+
+
+def run_merge_files(args: argparse.Namespace) -> None:
+ em = FileMerger(
+ args.api,
+ edit_batch_size=args.batch_size,
+ dry_run_mode=args.dry_run,
+ editgroup_description=args.editgroup_description_override,
+ )
+ JsonLinePusher(em, args.json_file).run()
+
+
+def main() -> None:
+ """
+ Invoke like:
+
+ python3 -m fatcat_tools.mergers.files [options]
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--host-url", default="http://localhost:9411/v0", help="connect to this host/port"
+ )
+ parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int)
+ parser.add_argument(
+ "--editgroup-description-override",
+ help="editgroup description override",
+ default=None,
+ type=str,
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="don't actually commit merges, just count what would have been",
+ )
+ parser.set_defaults(
+ auth_var="FATCAT_AUTH_API_TOKEN",
+ )
+ subparsers = parser.add_subparsers()
+
+ sub_merge_files = subparsers.add_parser("merge-files")
+ sub_merge_files.set_defaults(func=run_merge_files)
+ sub_merge_files.add_argument(
+ "json_file",
+ help="source of merge lines to process (or stdin)",
+ default=sys.stdin,
+ type=argparse.FileType("r"),
+ )
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ # allow editgroup description override via env variable (but CLI arg takes
+ # precedence)
+ if not args.editgroup_description_override and os.environ.get(
+ "FATCAT_EDITGROUP_DESCRIPTION"
+ ):
+ args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION")
+
+ args.api = authenticated_api(
+ args.host_url,
+ # token is an optional kwarg (can be empty string, None, etc)
+ token=os.environ.get(args.auth_var),
+ )
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py
new file mode 100644
index 00000000..1f995b00
--- /dev/null
+++ b/python/fatcat_tools/mergers/releases.py
@@ -0,0 +1,249 @@
+import argparse
+import os
+import sys
+from typing import Any, Dict, List, Optional
+
+import fatcat_openapi_client
+from fatcat_openapi_client.models import ReleaseEntity, WorkEntity
+
+from fatcat_tools import authenticated_api
+from fatcat_tools.importers import JsonLinePusher
+
+from .common import EntityMerger
+
+
+class ReleaseMerger(EntityMerger):
+ """
+ Hard merges a set of release entities, redirecting all entities to a single
+ primary release. This is different from "grouping" multiple releases under
+ a single work.
+
+ A "primary" (which the other releases will redirect to) can be provided, or
+ one will be chosen from the set of duplicate releases based on the
+ completeness of metadata and other heuristic factors.
+
+ Releases are some of the most complex entities to merge, because of
+ the complexity of bibliographic metadata and the number of related entities
+ which also need to be updated.
+
+ File, Fileset, and Webcapture entities which currently point to a release
+ which gets redirected will be updated to point at the "primary" release.
+
+ Any Work entities which will end up with no releases pointing at them after
+ the merging will get redirected to the work corresponding to the "primary"
+ release.
+
+ NOTE: the "primary" release will currently (as implemented) *not* get
+ updated with metadata from all the redirected releases
+ """
+
+ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
+
+ eg_desc = (
+ kwargs.pop("editgroup_description", None) or "Automated merge of release entities"
+ )
+ eg_extra = kwargs.get("editgroup_extra", dict())
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger")
+ self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False)
+ super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
+ self.entity_type_name = "release"
+
+ def choose_primary_release(
+ self, entities: List[ReleaseEntity], existing_redirects: Dict[str, List[str]]
+ ) -> str:
+ assert entities and len(entities) >= 2
+
+ # want to sort in descending order, so reverse=True
+ entities = sorted(
+ entities,
+ key=lambda a: (
+ # number of entities already redirected to this one
+ len(existing_redirects[a.ident]),
+ # number of file/fileset/webcapture entities (would need to update)
+ int(len(a.files or []) + len(a.filesets or []) + len(a.webcaptures or [])),
+ # has a strong identifier?
+ bool(a.ext_id.doi or a.ext_id.pmid or a.ext_id.arxiv_id),
+ # has any identifier?
+ bool(a.ext_id),
+ # has basic metadata?
+ bool(a.release_type),
+ bool(a.release_status),
+ bool(a.release_year),
+ bool(a.container_id),
+ # has refs, abstracts, extra stuff?
+ bool(a.refs),
+ bool(a.abstracts),
+ ),
+ reverse=True,
+ )
+ return entities[0].ident
+
+ def try_merge(
+ self,
+ dupe_ids: List[str],
+ primary_id: Optional[str] = None,
+ evidence: Optional[Dict[str, Any]] = None,
+ ) -> int:
+
+ # TODO: this code is pretty old and has only been partially refactored.
+ # Needs more testing and review.
+ raise NotImplementedError
+
+ updated_entities = 0
+ releases = dict()
+ existing_redirects: Dict[str, List[str]] = dict()
+ eg_id = self.get_editgroup_id()
+
+ all_ids = dupe_ids.copy()
+ if primary_id:
+ all_ids.append(primary_id)
+ for ident in all_ids:
+ releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures")
+ existing_redirects[ident] = self.api.get_release_redirects(ident)
+
+ if not primary_id:
+ primary_id = self.choose_primary_release(
+ list(releases.values()), existing_redirects
+ )
+ dupe_ids = [d for d in dupe_ids if d != primary_id]
+
+ assert primary_id not in dupe_ids
+
+ primary_work_id = releases[primary_id].work_id
+ updated_work_ids = []
+ redirected_release_ids = []
+
+ # execute all the release redirects
+ for release in releases.values():
+ if release.ident == primary_id:
+ continue
+
+ # file redirects
+ for e in release.files:
+ assert release.ident in e.release_ids
+ e.release_ids.remove(release.ident)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
+ if not self.dry_run_mode:
+ self.api.update_file(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts["updated-files"] += 1
+
+ # fileset redirects
+ for e in release.filesets:
+ assert release.ident in e.release_ids
+ e.release_ids.remove(release.ident)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
+ if not self.dry_run_mode:
+ self.api.update_fileset(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts["updated-filesets"] += 1
+
+ # webcapture redirects
+ for e in release.webcaptures:
+ assert release.ident in e.release_ids
+ e.release_ids.remove(release.ident)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
+ if not self.dry_run_mode:
+ self.api.update_webcapture(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts["updated-webcaptures"] += 1
+
+ # the release redirect itself
+ updated_work_ids.append(release.work_id)
+ redirected_release_ids.append(release.ident)
+ if not self.dry_run_mode:
+ self.api.update_release(
+ eg_id,
+ release.ident,
+ ReleaseEntity(redirect=primary_id, edit_extra=evidence),
+ )
+ updated_entities += 1
+ self.counts["updated-releases"] += 1
+
+ # lastly, clean up any merged work entities
+ redirected_release_ids = list(set(redirected_release_ids))
+ updated_work_ids = list(set(updated_work_ids))
+ assert primary_work_id not in updated_work_ids
+ for work_id in updated_work_ids:
+ work_releases = self.api.get_work_releases(work_id, hide="abstracts,refs")
+ rids = set([r.ident for r in work_releases])
+ if rids.issubset(redirected_release_ids):
+ # all the releases for this work were updated/merged; we should
+ # redirect to primary work_id
+ # also check we don't have any in-flight edit conflicts
+ assert work_id not in self._idents_inflight
+ self._idents_inflight.append(work_id)
+ if not self.dry_run_mode:
+ self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id))
+ updated_entities += 1
+ self.counts["updated-works"] += 1
+
+ return updated_entities
+
+
+def run_merge_releases(args: argparse.Namespace) -> None:
+ em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run)
+ JsonLinePusher(em, args.json_file).run()
+
+
+def main() -> None:
+ """
+ Invoke like:
+
+ python3 -m fatcat_tools.mergers.releases [options]
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--host-url", default="http://localhost:9411/v0", help="connect to this host/port"
+ )
+ parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int)
+ parser.add_argument(
+ "--editgroup-description-override",
+ help="editgroup description override",
+ default=None,
+ type=str,
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="don't actually commit merges, just count what would have been",
+ )
+ parser.set_defaults(
+ auth_var="FATCAT_AUTH_API_TOKEN",
+ )
+ subparsers = parser.add_subparsers()
+
+ sub_merge_releases = subparsers.add_parser("merge-releases")
+ sub_merge_releases.set_defaults(func=run_merge_releases)
+ sub_merge_releases.add_argument(
+ "json_file",
+ help="source of merge lines to process (or stdin)",
+ default=sys.stdin,
+ type=argparse.FileType("r"),
+ )
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ # allow editgroup description override via env variable (but CLI arg takes
+ # precedence)
+ if not args.editgroup_description_override and os.environ.get(
+ "FATCAT_EDITGROUP_DESCRIPTION"
+ ):
+ args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION")
+
+ args.api = authenticated_api(
+ args.host_url,
+ # token is an optional kwarg (can be empty string, None, etc)
+ token=os.environ.get(args.auth_var),
+ )
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/python/tests/merge_files.py b/python/tests/merge_files.py
new file mode 100644
index 00000000..c33c6f6c
--- /dev/null
+++ b/python/tests/merge_files.py
@@ -0,0 +1,160 @@
+from fatcat_openapi_client import FileEntity, FileUrl
+from fixtures import api
+
+from fatcat_tools.mergers.files import FileMerger
+
+
+def test_choose_primary_file(api) -> None:
+
+ fm = FileMerger(api=api)
+ fe_partial = FileEntity(
+ ident="aaaasb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa",
+ )
+ fe_norelease = FileEntity(
+ ident="bbbbsb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ md5="d2c7318315bfc7d3aab0db933e95e632",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ )
+ fe_nourls = FileEntity(
+ ident="ccccsb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ md5="d2c7318315bfc7d3aab0db933e95e632",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ release_ids=["dlrxjg7mxrayxfltget7fqcrjy"],
+ )
+ fe_complete = FileEntity(
+ ident="ddddsb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ md5="d2c7318315bfc7d3aab0db933e95e632",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ release_ids=["dlrxjg7mxrayxfltget7fqcrjy"],
+ urls=[
+ FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"),
+ ],
+ extra=dict(asdf=123),
+ )
+ fe_pseudo_complete = FileEntity(
+ ident="eeeesb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ release_ids=["dlrxjg7mxrayxfltget7fqcrjy"],
+ urls=[
+ FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"),
+ ],
+ extra=dict(asdf=123),
+ )
+
+ assert fm.choose_primary_file([fe_partial, fe_norelease]) == "bbbbsb5apzfhbbxxc7rgu2yw6m"
+ assert (
+ fm.choose_primary_file([fe_partial, fe_nourls, fe_norelease])
+ == "ccccsb5apzfhbbxxc7rgu2yw6m"
+ )
+ assert (
+ fm.choose_primary_file([fe_partial, fe_complete, fe_nourls, fe_norelease])
+ == "ddddsb5apzfhbbxxc7rgu2yw6m"
+ )
+ assert (
+ fm.choose_primary_file([fe_partial, fe_pseudo_complete, fe_nourls, fe_norelease])
+ == "ccccsb5apzfhbbxxc7rgu2yw6m"
+ )
+
+
+def test_merge_file_metadata_from(api) -> None:
+ fm = FileMerger(api=api)
+ fe_partial = FileEntity(
+ ident="aaaasb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa",
+ )
+ fe_norelease = FileEntity(
+ ident="bbbbsb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ md5="d2c7318315bfc7d3aab0db933e95e632",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ )
+ fe_nourls = FileEntity(
+ ident="ccccsb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ md5="d2c7318315bfc7d3aab0db933e95e632",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ release_ids=["dlrxjg7mxrayxfltget7fqcrjy"],
+ )
+ fe_complete = FileEntity(
+ ident="ddddsb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ md5="ddddddd315bfc7d3aab0db933e95e632",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ release_ids=["dlrxjg7mxrayxfltget7fqcrjy"],
+ urls=[
+ FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"),
+ ],
+ extra=dict(asdf=123),
+ )
+ fe_pseudo_complete = FileEntity(
+ ident="eeeesb5apzfhbbxxc7rgu2yw6m",
+ sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb",
+ sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163",
+ size=60719,
+ mimetype="application/pdf",
+ release_ids=["dlrxjg7mxrayxfltget7fqcrjy"],
+ urls=[
+ FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"),
+ ],
+ extra=dict(asdf=123),
+ )
+ fe_another_release_id = FileEntity(
+ ident="fffffffapzfhbbxxc7rgu2yw6m",
+ release_ids=["qqqqqg7mxrayxfltget7fqcrjy"],
+ )
+ fe_another_url = FileEntity(
+ ident="zzzzzzzapzfhbbxxc7rgu2yw6m",
+ urls=[
+ FileUrl(rel="repository", url="http://someuni.edu/repo/file.pdf"),
+ ],
+ )
+ fe_more_extra = FileEntity(
+ ident="fffffffapzfhbbxxc7rgu2yw6m",
+ release_ids=["qqqqqg7mxrayxfltget7fqcrjy"],
+ extra=dict(thang=456),
+ )
+
+ assert fm.merge_file_metadata_from(fe_nourls, fe_partial) is False
+ assert fm.merge_file_metadata_from(fe_complete, fe_pseudo_complete) is False
+ assert fm.merge_file_metadata_from(fe_complete, fe_complete) is False
+ assert fm.merge_file_metadata_from(fe_partial, fe_norelease) is True
+ assert fe_partial.md5 == fe_norelease.md5
+ assert fe_partial.size == fe_norelease.size
+ assert fm.merge_file_metadata_from(fe_partial, fe_complete) is True
+ assert fe_partial.md5 != fe_complete.md5
+ assert fe_partial.extra == fe_complete.extra
+ assert set([(u.rel, u.url) for u in fe_partial.urls or []]) == set(
+ [(u.rel, u.url) for u in fe_complete.urls or []]
+ )
+ assert fe_partial.release_ids == fe_complete.release_ids
+ assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is True
+ assert fe_partial.release_ids == [
+ "dlrxjg7mxrayxfltget7fqcrjy",
+ "qqqqqg7mxrayxfltget7fqcrjy",
+ ]
+ assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is False
+ assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is True
+ assert fe_partial.extra == dict(asdf=123, thang=456)
+ assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is False
+ assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is True
+ assert fe_partial.urls[-1].url == "http://someuni.edu/repo/file.pdf"
+ assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is False