aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/mergers/common.py
blob: 62a29c4262199a94c55e7d60ca4f291fe003da01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

"""
Tools for merging entities in various ways.

    group-releases: pull all release entities under a single work
        => merges work entities
    merge-releases: merge release entities together
        => groups files/filesets/webcaptures
    merge-containers: merge container entities
    merge-files: merge file entities

Input format is JSON lines with keys:

    idents (required): array of string identifiers
    primary (optional): single string identifier

"""

import subprocess
from collections import Counter

import fatcat_api_client
from fatcat_api_client.rest import ApiException


class EntityMerger:
    """
    API for individual jobs:

        # record iterators sees
        push_record(raw_record)
        finish()

        # provided helpers
        self.api
        self.get_editgroup_id()
        counts({'lines', 'skip', 'merged', 'updated'})

        # implemented per-task
        try_merge(idents, primary=None) -> int (entities updated)

    This class is pretty similar to EntityImporter, but isn't subclassed.
    """

    def __init__(self, api, **kwargs):

        eg_extra = kwargs.get('editgroup_extra', dict())
        eg_extra['git_rev'] = eg_extra.get('git_rev',
            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger')

        self.api = api
        self.dry_run_mode = kwargs.get('dry_run_mode', True)
        self.edit_batch_size = kwargs.get('edit_batch_size', 50)
        self.editgroup_description = kwargs.get('editgroup_description')
        self.editgroup_extra = eg_extra
        self.reset()

        if self.dry_run_mode:
            print("Running in dry-run mode!")

    def reset(self):
        self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0})
        self._edit_count = 0
        self._editgroup_id = None
        self._entity_queue = []
        self._idents_inflight = []

    def push_record(self, line):
        """
        Intended to be called by "pusher" class (which could be pulling from
        JSON file, Kafka, whatever).

        Input is expected to be a dict-like object with key "idents", and
        optionally "primary".

        Returns nothing.
        """
        self.counts['lines'] += 1
        if (not raw_record):
            self.counts['skip'] += 1
            return
        primary = line.get('primary')
        idents = list(set(line.get('idents')))
        if primary and primary not in idents:
            idents.append(primary)
        if not idents or len(idents) <= 1:
            self.counts['skip'] += 1
            return
        for i in idents:
            if i in self._idents_inflight:
                raise ValueError("Entity already part of in-process merge operation: {}".format(i))
            self._idents.inflight.append(i)
        count = self.try_merge(idents, primary=primary)
        if count:
            self.counts['merged'] += 1
            self.counts['updated-total'] += count
            self._edit_count += count
        else:
            self.counts['skip'] += 1
        if self._edit_count >= self.edit_batch_size:
            self.api.accept_editgroup(self._editgroup_id)
            self._editgroup_id = None
            self._edit_count = 0
            self._idents_inflight = []
        return

    def try_merge(self, idents, primary=None):
        # implementations should fill this in
        raise NotImplementedError

    def finish(self):
        if self._edit_count > 0:
            self.api.accept_editgroup(self._editgroup_id)
            self._editgroup_id = None
            self._edit_count = 0
            self._idents_inflight = []

        return self.counts

    def get_editgroup_id(self):

        if not self._editgroup_id:
            eg = self.api.create_editgroup(
                fatcat_api_client.Editgroup(
                    description=self.editgroup_description,
                    extra=self.editgroup_extra))
            self._editgroup_id = eg.editgroup_id

        return self._editgroup_id