diff options
Diffstat (limited to 'python/fatcat_tools/mergers')
| -rw-r--r-- | python/fatcat_tools/mergers/__init__.py | 3 | ||||
| -rw-r--r-- | python/fatcat_tools/mergers/common.py | 130 | ||||
| -rw-r--r-- | python/fatcat_tools/mergers/releases.py | 110 | 
3 files changed, 243 insertions, 0 deletions
| diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py new file mode 100644 index 00000000..c38a397d --- /dev/null +++ b/python/fatcat_tools/mergers/__init__.py @@ -0,0 +1,3 @@ + +from .common import EntityMerger +from .releases import ReleaseMerger, ReleaseGrouper diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..62a29c42 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,130 @@ + +""" +Tools for merging entities in various ways. + +    group-releases: pull all release entities under a single work +        => merges work entities +    merge-releases: merge release entities together +        => groups files/filesets/webcaptures +    merge-containers: merge container entities +    merge-files: merge file entities + +Input format is JSON lines with keys: + +    idents (required): array of string identifiers +    primary (optional): single string identifier + +""" + +import subprocess +from collections import Counter + +import fatcat_api_client +from fatcat_api_client.rest import ApiException + + +class EntityMerger: +    """ +    API for individual jobs: + +        # record iterators sees +        push_record(raw_record) +        finish() + +        # provided helpers +        self.api +        self.get_editgroup_id() +        counts({'lines', 'skip', 'merged', 'updated'}) + +        # implemented per-task +        try_merge(idents, primary=None) -> int (entities updated) + +    This class is pretty similar to EntityImporter, but isn't subclassed. +    """ + +    def __init__(self, api, **kwargs): + +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['git_rev'] = eg_extra.get('git_rev', +            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + +        self.api = api +        self.dry_run_mode = kwargs.get('dry_run_mode', True) +        self.edit_batch_size = kwargs.get('edit_batch_size', 50) +        self.editgroup_description = kwargs.get('editgroup_description') +        self.editgroup_extra = eg_extra +        self.reset() + +        if self.dry_run_mode: +            print("Running in dry-run mode!") + +    def reset(self): +        self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) +        self._edit_count = 0 +        self._editgroup_id = None +        self._entity_queue = [] +        self._idents_inflight = [] + +    def push_record(self, line): +        """ +        Intended to be called by "pusher" class (which could be pulling from +        JSON file, Kafka, whatever). + +        Input is expected to be a dict-like object with key "idents", and +        optionally "primary". + +        Returns nothing. +        """ +        self.counts['lines'] += 1 +        if (not raw_record): +            self.counts['skip'] += 1 +            return +        primary = line.get('primary') +        idents = list(set(line.get('idents'))) +        if primary and primary not in idents: +            idents.append(primary) +        if not idents or len(idents) <= 1: +            self.counts['skip'] += 1 +            return +        for i in idents: +            if i in self._idents_inflight: +                raise ValueError("Entity already part of in-process merge operation: {}".format(i)) +            self._idents.inflight.append(i) +        count = self.try_merge(idents, primary=primary) +        if count: +            self.counts['merged'] += 1 +            self.counts['updated-total'] += count +            self._edit_count += count +        else: +            self.counts['skip'] += 1 +        if self._edit_count >= self.edit_batch_size: +            self.api.accept_editgroup(self._editgroup_id) +            self._editgroup_id = None +            self._edit_count = 0 +            self._idents_inflight = [] +        return + +    def try_merge(self, idents, primary=None): +        # implementations should fill this in +        raise NotImplementedError + +    def finish(self): +        if self._edit_count > 0: +            self.api.accept_editgroup(self._editgroup_id) +            self._editgroup_id = None +            self._edit_count = 0 +            self._idents_inflight = [] + +        return self.counts + +    def get_editgroup_id(self): + +        if not self._editgroup_id: +            eg = self.api.create_editgroup( +                fatcat_api_client.Editgroup( +                    description=self.editgroup_description, +                    extra=self.editgroup_extra)) +            self._editgroup_id = eg.editgroup_id + +        return self._editgroup_id diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py new file mode 100644 index 00000000..802cb8da --- /dev/null +++ b/python/fatcat_tools/mergers/releases.py @@ -0,0 +1,110 @@ + +from .common import EntityMerger +from fatcat_api_client.models import ReleaseEntity, WorkEntity + + +class ReleaseMerger(EntityMerger): +    """ +    Hard merges a set of release entities, redirecting all entities to a single +    primary release. + +    Will also redirect works (if appropriate), and re-point {files, filesets, +    webcaptures} to the new merged release. +    """ + +    def __init__(self, api, **kwargs): + +        eg_desc = kwargs.get('editgroup_description', +            "Automated merge of release entities") +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger') +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) + +    def try_merge(self, idents, primary=None): + +        updated_entities = 0 +        releases = dict() +        eg_id = self.get_editgroup_id() + +        self.dry_run_mode + +        for ident in idents: +            releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures") + +        # select the primary (if not already set) +        if not primary: +            primary = releases.keys()[0] + +        primary_work_id = releases[primary].work_id +        updated_work_ids = [] +        redirected_release_ids = [] + +        # execute all the release redirects +        for release in releases.values(): +            if release.ident == primary: +                continue + +            # file redirects +            for e in release.files: +                e.release_ids.remove(release.ident) +                if not primary in e.release_ids: +                    e.release_ids.append(primary) +                if not self.dry_run_mode: +                    api.update_file(eg_id, e.ident, e) +                updated_entities += 1 +                self.counts['updated-files'] += 1 + +            # fileset redirects +            for e in release.filesets: +                e.release_ids.remove(release.ident) +                if not primary in e.release_ids: +                    e.release_ids.append(primary) +                if not self.dry_run_mode: +                    api.update_fileset(eg_id, e.ident, e) +                updated_entities += 1 +                self.counts['updated-filesets'] += 1 + +            # webcapture redirects +            for e in release.webcaptures: +                e.release_ids.remove(release.ident) +                if not primary in e.release_ids: +                    e.release_ids.append(primary) +                if not self.dry_run_mode: +                    api.update_webcapture(eg_id, e.ident, e) +                updated_entities += 1 +                self.counts['updated-webcaptures'] += 1 + +            # release redirect itself +            updated_work_ids.append(release.work_id) +            redirected_release_ids.append(release.ident) +            if not self.dry_run_mode: +                api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary)) +            updated_entities += 1 +            self.counts['updated-releases'] + + +        # lastly, clean up any merged work entities +        redirected_release_ids = set(redirected_release_ids) +        updated_work_ids = list(set(updated_work_ids)) +        assert primary_work_id not in updated_work_ids +        for work_id in updated_work_ids: +            work_releases = api.get_work_releases(work_id) +            rids = set([r.ident for r in work_releases]) +            if rids.issubset(redirected_release_ids): +                # all the releases for this work were updated/merged; we should +                # redirect to primary work_id +                # also check we don't have any in-flight edit conflicts +                assert not work_id in self._idents_inflight +                self._idents_inflight.append(work_id) +                if not self.dry_run_mode: +                    api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id)) +                updated_entities += 1 +                self.counts['updated-works'] += 1 + +        return updated_entities + +class ReleaseGrouper(EntityMerger): +    pass | 
