diff options
Diffstat (limited to 'python')
| -rwxr-xr-x | python/fatcat_merge.py | 112 | ||||
| -rw-r--r-- | python/fatcat_tools/mergers/__init__.py | 3 | ||||
| -rw-r--r-- | python/fatcat_tools/mergers/common.py | 130 | ||||
| -rw-r--r-- | python/fatcat_tools/mergers/releases.py | 110 | 
4 files changed, 355 insertions, 0 deletions
| diff --git a/python/fatcat_merge.py b/python/fatcat_merge.py new file mode 100755 index 00000000..7b0ae63b --- /dev/null +++ b/python/fatcat_merge.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 + +""" +Tools for merging entities in various ways. + +    group-releases: pull all release entities under a single work +        => merges work entities +    merge-releases: merge release entities together +        => groups files/filesets/webcaptures +    merge-containers: merge container entities +    merge-files: merge file entities + +Input format is usually JSON lines with keys: + +    idents (required): array of string identifiers +    primary (optional): single string identifier + +""" + +import os, sys, argparse +from fatcat_tools import authenticated_api +from fatcat_tools.mergers import * +from fatcat_tools.importers import JsonLinePusher + + +def run_group_releases(args): +    rg = ReleaseGrouper(args.api, +        edit_batch_size=args.batch_size, +        dry_run_mode=args.dry_run) +    JsonLinePusher(rg, args.json_file).run() + +def run_merge_releases(args): +    rm = ReleaseMerger(args.api, +        edit_batch_size=args.batch_size, +        dry_run_mode=args.dry_run) +    JsonLinePusher(rg, args.json_file).run() + +def run_merge_containers(args): +    cm = ReleaseMerger(args.api, +        edit_batch_size=args.batch_size, +        dry_run_mode=args.dry_run) +    JsonLinePusher(cm, args.json_file).run() + +def run_merge_files(args): +    fm = FileMerger(args.api, +        edit_batch_size=args.batch_size, +        dry_run_mode=args.dry_run) +    JsonLinePusher(fm, args.json_file).run() + + +def main(): +    parser = argparse.ArgumentParser() +    parser.add_argument('--host-url', +        default="http://localhost:9411/v0", +        help="connect to this host/port") +    parser.add_argument('--batch-size', +        help="size of batch to send", +        default=50, type=int) +    parser.add_argument('--editgroup-description-override', +        help="editgroup description override", +        default=None, type=str) +    parser.add_argument('--dry-run', +        action='store_true', +        help="don't actually commit merges, just count what would have been") +    parser.set_defaults( +        auth_var="FATCAT_AUTH_API_TOKEN", +    ) +    subparsers = parser.add_subparsers() + +    sub_group_releases = subparsers.add_parser('group-releases') +    sub_group_releases.set_defaults(func=run_group_releases) +    sub_group_releases.add_argument('json_file', +        help="source of merge lines to process (or stdin)", +        default=sys.stdin, type=argparse.FileType('r')) + +    sub_merge_releases = subparsers.add_parser('merge-releases') +    sub_merge_releases.set_defaults(func=run_merge_releases) +    sub_merge_releases.add_argument('json_file', +        help="source of merge lines to process (or stdin)", +        default=sys.stdin, type=argparse.FileType('r')) + +    sub_merge_files = subparsers.add_parser('merge-files') +    sub_merge_files.set_defaults(func=run_merge_files) +    sub_merge_files.add_argument('json_file', +        help="source of merge lines to process (or stdin)", +        default=sys.stdin, type=argparse.FileType('r')) + +    sub_merge_containers = subparsers.add_parser('merge-containers') +    sub_merge_containers.set_defaults(func=run_merge_containers) +    sub_merge_containers.add_argument('json_file', +        help="source of merge lines to process (or stdin)", +        default=sys.stdin, type=argparse.FileType('r')) + +    args = parser.parse_args() +    if not args.__dict__.get("func"): +        print("tell me what to do!") +        sys.exit(-1) + +    # allow editgroup description override via env variable (but CLI arg takes +    # precedence) +    if not args.editgroup_description_override \ +            and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): +        args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') + +    args.api = authenticated_api( +        args.host_url, +        # token is an optional kwarg (can be empty string, None, etc) +        token=os.environ.get(args.auth_var)) +    args.func(args) + +if __name__ == '__main__': +    main() diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py new file mode 100644 index 00000000..c38a397d --- /dev/null +++ b/python/fatcat_tools/mergers/__init__.py @@ -0,0 +1,3 @@ + +from .common import EntityMerger +from .releases import ReleaseMerger, ReleaseGrouper diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..62a29c42 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,130 @@ + +""" +Tools for merging entities in various ways. + +    group-releases: pull all release entities under a single work +        => merges work entities +    merge-releases: merge release entities together +        => groups files/filesets/webcaptures +    merge-containers: merge container entities +    merge-files: merge file entities + +Input format is JSON lines with keys: + +    idents (required): array of string identifiers +    primary (optional): single string identifier + +""" + +import subprocess +from collections import Counter + +import fatcat_api_client +from fatcat_api_client.rest import ApiException + + +class EntityMerger: +    """ +    API for individual jobs: + +        # record iterators sees +        push_record(raw_record) +        finish() + +        # provided helpers +        self.api +        self.get_editgroup_id() +        counts({'lines', 'skip', 'merged', 'updated'}) + +        # implemented per-task +        try_merge(idents, primary=None) -> int (entities updated) + +    This class is pretty similar to EntityImporter, but isn't subclassed. +    """ + +    def __init__(self, api, **kwargs): + +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['git_rev'] = eg_extra.get('git_rev', +            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + +        self.api = api +        self.dry_run_mode = kwargs.get('dry_run_mode', True) +        self.edit_batch_size = kwargs.get('edit_batch_size', 50) +        self.editgroup_description = kwargs.get('editgroup_description') +        self.editgroup_extra = eg_extra +        self.reset() + +        if self.dry_run_mode: +            print("Running in dry-run mode!") + +    def reset(self): +        self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) +        self._edit_count = 0 +        self._editgroup_id = None +        self._entity_queue = [] +        self._idents_inflight = [] + +    def push_record(self, line): +        """ +        Intended to be called by "pusher" class (which could be pulling from +        JSON file, Kafka, whatever). + +        Input is expected to be a dict-like object with key "idents", and +        optionally "primary". + +        Returns nothing. +        """ +        self.counts['lines'] += 1 +        if (not raw_record): +            self.counts['skip'] += 1 +            return +        primary = line.get('primary') +        idents = list(set(line.get('idents'))) +        if primary and primary not in idents: +            idents.append(primary) +        if not idents or len(idents) <= 1: +            self.counts['skip'] += 1 +            return +        for i in idents: +            if i in self._idents_inflight: +                raise ValueError("Entity already part of in-process merge operation: {}".format(i)) +            self._idents.inflight.append(i) +        count = self.try_merge(idents, primary=primary) +        if count: +            self.counts['merged'] += 1 +            self.counts['updated-total'] += count +            self._edit_count += count +        else: +            self.counts['skip'] += 1 +        if self._edit_count >= self.edit_batch_size: +            self.api.accept_editgroup(self._editgroup_id) +            self._editgroup_id = None +            self._edit_count = 0 +            self._idents_inflight = [] +        return + +    def try_merge(self, idents, primary=None): +        # implementations should fill this in +        raise NotImplementedError + +    def finish(self): +        if self._edit_count > 0: +            self.api.accept_editgroup(self._editgroup_id) +            self._editgroup_id = None +            self._edit_count = 0 +            self._idents_inflight = [] + +        return self.counts + +    def get_editgroup_id(self): + +        if not self._editgroup_id: +            eg = self.api.create_editgroup( +                fatcat_api_client.Editgroup( +                    description=self.editgroup_description, +                    extra=self.editgroup_extra)) +            self._editgroup_id = eg.editgroup_id + +        return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py new file mode 100644 index 00000000..802cb8da --- /dev/null +++ b/python/fatcat_tools/mergers/releases.py @@ -0,0 +1,110 @@ + +from .common import EntityMerger +from fatcat_api_client.models import ReleaseEntity, WorkEntity + + +class ReleaseMerger(EntityMerger): +    """ +    Hard merges a set of release entities, redirecting all entities to a single +    primary release. + +    Will also redirect works (if appropriate), and re-point {files, filesets, +    webcaptures} to the new merged release. +    """ + +    def __init__(self, api, **kwargs): + +        eg_desc = kwargs.get('editgroup_description', +            "Automated merge of release entities") +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) + +    def try_merge(self, idents, primary=None): + +        updated_entities = 0 +        releases = dict() +        eg_id = self.get_editgroup_id() + +        self.dry_run_mode + +        for ident in idents: +            releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + +        # select the primary (if not already set) +        if not primary: +            primary = releases.keys()[0] + +        primary_work_id = releases[primary].work_id +        updated_work_ids = [] +        redirected_release_ids = [] + +        # execute all the release redirects +        for release in releases.values(): +            if release.ident == primary: +                continue + +            # file redirects +            for e in release.files: +                e.release_ids.remove(release.ident) +                if not primary in e.release_ids: +                    e.release_ids.append(primary) +                if not self.dry_run_mode: +                    api.update_file(eg_id, e.ident, e) +                updated_entities += 1 +                self.counts['updated-files'] += 1 + +            # fileset redirects +            for e in release.filesets: +                e.release_ids.remove(release.ident) +                if not primary in e.release_ids: +                    e.release_ids.append(primary) +                if not self.dry_run_mode: +                    api.update_fileset(eg_id, e.ident, e) +                updated_entities += 1 +                self.counts['updated-filesets'] += 1 + +            # webcapture redirects +            for e in release.webcaptures: +                e.release_ids.remove(release.ident) +                if not primary in e.release_ids: +                    e.release_ids.append(primary) +                if not self.dry_run_mode: +                    api.update_webcapture(eg_id, e.ident, e) +                updated_entities += 1 +                self.counts['updated-webcaptures'] += 1 + +            # release redirect itself +            updated_work_ids.append(release.work_id) +            redirected_release_ids.append(release.ident) +            if not self.dry_run_mode: +                api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) +            updated_entities += 1 +            self.counts['updated-releases'] + + +        # lastly, clean up any merged work entities +        redirected_release_ids = set(redirected_release_ids) +        updated_work_ids = list(set(updated_work_ids)) +        assert primary_work_id not in updated_work_ids +        for work_id in updated_work_ids: +            work_releases = api.get_work_releases(work_id) +            rids = set([r.ident for r in work_releases]) +            if rids.issubset(redirected_release_ids): +                # all the releases for this work were updated/merged; we should +                # redirect to primary work_id +                # also check we don't have any in-flight edit conflicts +                assert not work_id in self._idents_inflight +                self._idents_inflight.append(work_id) +                if not self.dry_run_mode: +                    api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) +                updated_entities += 1 +                self.counts['updated-works'] += 1 + +        return updated_entities + +class ReleaseGrouper(EntityMerger): +    pass | 
