diff options
Diffstat (limited to 'python/fatcat_tools/mergers')
-rw-r--r-- | python/fatcat_tools/mergers/__init__.py | 4 | ||||
-rw-r--r-- | python/fatcat_tools/mergers/common.py | 129 | ||||
-rw-r--r-- | python/fatcat_tools/mergers/releases.py | 163 |
3 files changed, 200 insertions, 96 deletions
diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py index c38a397d..ccc4c02c 100644 --- a/python/fatcat_tools/mergers/__init__.py +++ b/python/fatcat_tools/mergers/__init__.py @@ -1,3 +1,3 @@ - from .common import EntityMerger -from .releases import ReleaseMerger, ReleaseGrouper +from .files import FileMerger +from .releases import ReleaseMerger diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 62a29c42..1aaf357d 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -1,29 +1,25 @@ - """ Tools for merging entities in various ways. - group-releases: pull all release entities under a single work - => merges work entities merge-releases: merge release entities together => groups files/filesets/webcaptures + => merges work entities if needed + merge-works: pull all release entities under a single work + => merges work entities merge-containers: merge container entities merge-files: merge file entities - -Input format is JSON lines with keys: - - idents (required): array of string identifiers - primary (optional): single string identifier - """ import subprocess from collections import Counter +from typing import Any, Dict, List, Optional -import fatcat_api_client -from fatcat_api_client.rest import ApiException +import fatcat_openapi_client +from fatcat_tools.importers import EntityImporter -class EntityMerger: + +class EntityMerger(EntityImporter): """ API for individual jobs: @@ -37,67 +33,85 @@ class EntityMerger: counts({'lines', 'skip', 'merged', 'updated'}) # implemented per-task - try_merge(idents, primary=None) -> int (entities updated) + try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None This class is pretty similar to EntityImporter, but isn't subclassed. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger") self.api = api - self.dry_run_mode = kwargs.get('dry_run_mode', True) - self.edit_batch_size = kwargs.get('edit_batch_size', 50) - self.editgroup_description = kwargs.get('editgroup_description') + self.dry_run_mode = kwargs.get("dry_run_mode", True) + self.edit_batch_size = kwargs.get("edit_batch_size", 50) + self.editgroup_description = kwargs.get("editgroup_description") self.editgroup_extra = eg_extra self.reset() + self.entity_type_name = "common" if self.dry_run_mode: print("Running in dry-run mode!") - def reset(self): - self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + def reset(self) -> None: + self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0}) self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] - self._idents_inflight = [] + self._editgroup_id: Optional[str] = None + self._idents_inflight: List[str] = [] - def push_record(self, line): + def push_record(self, record: Dict[str, Any]) -> None: """ Intended to be called by "pusher" class (which could be pulling from JSON file, Kafka, whatever). - Input is expected to be a dict-like object with key "idents", and - optionally "primary". + Input is expected to be a dict-like object with keys: + + entity_type: str + primary_id: Optional[str] + duplicate_ids: [str] (order not preserved) + evidence: Optional[dict] + # can be anything, entity- or merger-specific + # some variables might be... + dupe_extid: str + dupe_extid_type: str Returns nothing. """ - self.counts['lines'] += 1 - if (not raw_record): - self.counts['skip'] += 1 + self.counts["lines"] += 1 + if not record: + self.counts["skip-blank-line"] += 1 + return + if record.get("entity_type") != self.entity_type_name: + self.counts["skip-entity-type"] += 1 return - primary = line.get('primary') - idents = list(set(line.get('idents'))) - if primary and primary not in idents: - idents.append(primary) - if not idents or len(idents) <= 1: - self.counts['skip'] += 1 + primary_id: Optional[str] = record.get("primary_id") + duplicate_ids: List[str] = list(set(record["duplicate_ids"])) + duplicate_ids = [di for di in duplicate_ids if di != primary_id] + if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id): + self.counts["skip-no-dupes"] += 1 return - for i in idents: + all_ids = duplicate_ids + if primary_id: + all_ids.append(primary_id) + for i in all_ids: if i in self._idents_inflight: - raise ValueError("Entity already part of in-process merge operation: {}".format(i)) - self._idents.inflight.append(i) - count = self.try_merge(idents, primary=primary) + raise ValueError( + "Entity already part of in-process merge operation: {}".format(i) + ) + self._idents_inflight.append(i) + count = self.try_merge( + duplicate_ids, primary_id=primary_id, evidence=record.get("evidence") + ) if count: - self.counts['merged'] += 1 - self.counts['updated-total'] += count + self.counts["merged"] += 1 + self.counts["updated-entities"] += count self._edit_count += count else: - self.counts['skip'] += 1 + self.counts["skip"] += 1 if self._edit_count >= self.edit_batch_size: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -105,11 +119,16 @@ class EntityMerger: self._idents_inflight = [] return - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: # implementations should fill this in raise NotImplementedError - def finish(self): + def finish(self) -> Counter: if self._edit_count > 0: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -118,13 +137,19 @@ class EntityMerger: return self.counts - def get_editgroup_id(self): + def get_editgroup_id(self, _edits: int = 1) -> str: + """ + This version of get_editgroup_id() is similar to the EntityImporter + version, but does not update self._edit_count or submit editgroups. The + edits parameter is ignored. + """ if not self._editgroup_id: eg = self.api.create_editgroup( - fatcat_api_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra)) + fatcat_openapi_client.Editgroup( + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) self._editgroup_id = eg.editgroup_id - + assert self._editgroup_id return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py index 802cb8da..fc970057 100644 --- a/python/fatcat_tools/mergers/releases.py +++ b/python/fatcat_tools/mergers/releases.py @@ -1,6 +1,15 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import ReleaseEntity, WorkEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher from .common import EntityMerger -from fatcat_api_client.models import ReleaseEntity, WorkEntity class ReleaseMerger(EntityMerger): @@ -12,99 +21,169 @@ class ReleaseMerger(EntityMerger): webcaptures} to the new merged release. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_desc = kwargs.get('editgroup_description', - "Automated merge of release entities") - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') - super().__init__(api, - editgroup_description=eg_desc, - editgroup_extra=eg_extra, - **kwargs) + eg_desc = kwargs.get("editgroup_description", "Automated merge of release entities") + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger") + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "release" - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + """ + XXX: review/refactor; this code is very old + """ updated_entities = 0 releases = dict() eg_id = self.get_editgroup_id() - self.dry_run_mode - - for ident in idents: - releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures") - # select the primary (if not already set) - if not primary: - primary = releases.keys()[0] + if not primary_id: + # XXX: + primary_id = dupe_ids[0] + dupe_ids = [d for d in dupe_ids if d != primary_id] - primary_work_id = releases[primary].work_id + primary_work_id = releases[primary_id].work_id updated_work_ids = [] redirected_release_ids = [] # execute all the release redirects for release in releases.values(): - if release.ident == primary: + if release.ident == primary_id: continue # file redirects for e in release.files: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_file(eg_id, e.ident, e) + self.api.update_file(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-files'] += 1 + self.counts["updated-files"] += 1 # fileset redirects for e in release.filesets: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_fileset(eg_id, e.ident, e) + self.api.update_fileset(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-filesets'] += 1 + self.counts["updated-filesets"] += 1 # webcapture redirects for e in release.webcaptures: e.release_ids.remove(release.ident) - if not primary in e.release_ids: - e.release_ids.append(primary) + if primary_id not in e.release_ids: + e.release_ids.append(primary_id) if not self.dry_run_mode: - api.update_webcapture(eg_id, e.ident, e) + self.api.update_webcapture(eg_id, e.ident, e) updated_entities += 1 - self.counts['updated-webcaptures'] += 1 + self.counts["updated-webcaptures"] += 1 # release redirect itself updated_work_ids.append(release.work_id) redirected_release_ids.append(release.ident) if not self.dry_run_mode: - api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) + self.api.update_release( + eg_id, release.ident, ReleaseEntity(redirect=primary_id) + ) updated_entities += 1 - self.counts['updated-releases'] - + self.counts["updated-releases"] += 1 # lastly, clean up any merged work entities - redirected_release_ids = set(redirected_release_ids) + redirected_release_ids = list(set(redirected_release_ids)) updated_work_ids = list(set(updated_work_ids)) assert primary_work_id not in updated_work_ids for work_id in updated_work_ids: - work_releases = api.get_work_releases(work_id) + work_releases = self.api.get_work_releases(work_id) rids = set([r.ident for r in work_releases]) if rids.issubset(redirected_release_ids): # all the releases for this work were updated/merged; we should # redirect to primary work_id # also check we don't have any in-flight edit conflicts - assert not work_id in self._idents_inflight + assert work_id not in self._idents_inflight self._idents_inflight.append(work_id) if not self.dry_run_mode: - api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) updated_entities += 1 - self.counts['updated-works'] += 1 + self.counts["updated-works"] += 1 return updated_entities -class ReleaseGrouper(EntityMerger): - pass + +def run_merge_releases(args: argparse.Namespace) -> None: + em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.releases [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_releases = subparsers.add_parser("merge-releases") + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() |