From 8b132461c9e9f85e61c520f2f576144a6f6e06ac Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 5 Sep 2019 18:41:30 -0700 Subject: first iteration of mergers --- python/fatcat_merge.py | 112 +++++++++++++++++++++++++++ python/fatcat_tools/mergers/__init__.py | 3 + python/fatcat_tools/mergers/common.py | 130 ++++++++++++++++++++++++++++++++ python/fatcat_tools/mergers/releases.py | 110 +++++++++++++++++++++++++++ 4 files changed, 355 insertions(+) create mode 100755 python/fatcat_merge.py create mode 100644 python/fatcat_tools/mergers/__init__.py create mode 100644 python/fatcat_tools/mergers/common.py create mode 100644 python/fatcat_tools/mergers/releases.py diff --git a/python/fatcat_merge.py b/python/fatcat_merge.py new file mode 100755 index 00000000..7b0ae63b --- /dev/null +++ b/python/fatcat_merge.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 + +""" +Tools for merging entities in various ways. + + group-releases: pull all release entities under a single work + => merges work entities + merge-releases: merge release entities together + => groups files/filesets/webcaptures + merge-containers: merge container entities + merge-files: merge file entities + +Input format is usually JSON lines with keys: + + idents (required): array of string identifiers + primary (optional): single string identifier + +""" + +import os, sys, argparse +from fatcat_tools import authenticated_api +from fatcat_tools.mergers import * +from fatcat_tools.importers import JsonLinePusher + + +def run_group_releases(args): + rg = ReleaseGrouper(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(rg, args.json_file).run() + +def run_merge_releases(args): + rm = ReleaseMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(rg, args.json_file).run() + +def run_merge_containers(args): + cm = ReleaseMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(cm, args.json_file).run() + +def run_merge_files(args): + fm = FileMerger(args.api, + edit_batch_size=args.batch_size, + dry_run_mode=args.dry_run) + JsonLinePusher(fm, args.json_file).run() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--batch-size', + help="size of batch to send", + default=50, type=int) + parser.add_argument('--editgroup-description-override', + help="editgroup description override", + default=None, type=str) + parser.add_argument('--dry-run', + action='store_true', + help="don't actually commit merges, just count what would have been") + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_group_releases = subparsers.add_parser('group-releases') + sub_group_releases.set_defaults(func=run_group_releases) + sub_group_releases.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_releases = subparsers.add_parser('merge-releases') + sub_merge_releases.set_defaults(func=run_merge_releases) + sub_merge_releases.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_files = subparsers.add_parser('merge-files') + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_merge_containers = subparsers.add_parser('merge-containers') + sub_merge_containers.set_defaults(func=run_merge_containers) + sub_merge_containers.add_argument('json_file', + help="source of merge lines to process (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override \ + and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): + args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var)) + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py new file mode 100644 index 00000000..c38a397d --- /dev/null +++ b/python/fatcat_tools/mergers/__init__.py @@ -0,0 +1,3 @@ + +from .common import EntityMerger +from .releases import ReleaseMerger, ReleaseGrouper diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..62a29c42 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,130 @@ + +""" +Tools for merging entities in various ways. + + group-releases: pull all release entities under a single work + => merges work entities + merge-releases: merge release entities together + => groups files/filesets/webcaptures + merge-containers: merge container entities + merge-files: merge file entities + +Input format is JSON lines with keys: + + idents (required): array of string identifiers + primary (optional): single string identifier + +""" + +import subprocess +from collections import Counter + +import fatcat_api_client +from fatcat_api_client.rest import ApiException + + +class EntityMerger: + """ + API for individual jobs: + + # record iterators sees + push_record(raw_record) + finish() + + # provided helpers + self.api + self.get_editgroup_id() + counts({'lines', 'skip', 'merged', 'updated'}) + + # implemented per-task + try_merge(idents, primary=None) -> int (entities updated) + + This class is pretty similar to EntityImporter, but isn't subclassed. + """ + + def __init__(self, api, **kwargs): + + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['git_rev'] = eg_extra.get('git_rev', + subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + + self.api = api + self.dry_run_mode = kwargs.get('dry_run_mode', True) + self.edit_batch_size = kwargs.get('edit_batch_size', 50) + self.editgroup_description = kwargs.get('editgroup_description') + self.editgroup_extra = eg_extra + self.reset() + + if self.dry_run_mode: + print("Running in dry-run mode!") + + def reset(self): + self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + self._idents_inflight = [] + + def push_record(self, line): + """ + Intended to be called by "pusher" class (which could be pulling from + JSON file, Kafka, whatever). + + Input is expected to be a dict-like object with key "idents", and + optionally "primary". + + Returns nothing. + """ + self.counts['lines'] += 1 + if (not raw_record): + self.counts['skip'] += 1 + return + primary = line.get('primary') + idents = list(set(line.get('idents'))) + if primary and primary not in idents: + idents.append(primary) + if not idents or len(idents) <= 1: + self.counts['skip'] += 1 + return + for i in idents: + if i in self._idents_inflight: + raise ValueError("Entity already part of in-process merge operation: {}".format(i)) + self._idents.inflight.append(i) + count = self.try_merge(idents, primary=primary) + if count: + self.counts['merged'] += 1 + self.counts['updated-total'] += count + self._edit_count += count + else: + self.counts['skip'] += 1 + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + return + + def try_merge(self, idents, primary=None): + # implementations should fill this in + raise NotImplementedError + + def finish(self): + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + + return self.counts + + def get_editgroup_id(self): + + if not self._editgroup_id: + eg = self.api.create_editgroup( + fatcat_api_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + self._editgroup_id = eg.editgroup_id + + return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py new file mode 100644 index 00000000..802cb8da --- /dev/null +++ b/python/fatcat_tools/mergers/releases.py @@ -0,0 +1,110 @@ + +from .common import EntityMerger +from fatcat_api_client.models import ReleaseEntity, WorkEntity + + +class ReleaseMerger(EntityMerger): + """ + Hard merges a set of release entities, redirecting all entities to a single + primary release. + + Will also redirect works (if appropriate), and re-point {files, filesets, + webcaptures} to the new merged release. + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.get('editgroup_description', + "Automated merge of release entities") + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def try_merge(self, idents, primary=None): + + updated_entities = 0 + releases = dict() + eg_id = self.get_editgroup_id() + + self.dry_run_mode + + for ident in idents: + releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + + # select the primary (if not already set) + if not primary: + primary = releases.keys()[0] + + primary_work_id = releases[primary].work_id + updated_work_ids = [] + redirected_release_ids = [] + + # execute all the release redirects + for release in releases.values(): + if release.ident == primary: + continue + + # file redirects + for e in release.files: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_file(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-files'] += 1 + + # fileset redirects + for e in release.filesets: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_fileset(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-filesets'] += 1 + + # webcapture redirects + for e in release.webcaptures: + e.release_ids.remove(release.ident) + if not primary in e.release_ids: + e.release_ids.append(primary) + if not self.dry_run_mode: + api.update_webcapture(eg_id, e.ident, e) + updated_entities += 1 + self.counts['updated-webcaptures'] += 1 + + # release redirect itself + updated_work_ids.append(release.work_id) + redirected_release_ids.append(release.ident) + if not self.dry_run_mode: + api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) + updated_entities += 1 + self.counts['updated-releases'] + + + # lastly, clean up any merged work entities + redirected_release_ids = set(redirected_release_ids) + updated_work_ids = list(set(updated_work_ids)) + assert primary_work_id not in updated_work_ids + for work_id in updated_work_ids: + work_releases = api.get_work_releases(work_id) + rids = set([r.ident for r in work_releases]) + if rids.issubset(redirected_release_ids): + # all the releases for this work were updated/merged; we should + # redirect to primary work_id + # also check we don't have any in-flight edit conflicts + assert not work_id in self._idents_inflight + self._idents_inflight.append(work_id) + if not self.dry_run_mode: + api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) + updated_entities += 1 + self.counts['updated-works'] += 1 + + return updated_entities + +class ReleaseGrouper(EntityMerger): + pass -- cgit v1.2.3