aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/mergers/common.py
blob: f81975199904fb7dc2dbf09f30eb185a2ac5276d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
Tools for merging entities in various ways.

    merge-releases: merge release entities together
        => groups files/filesets/webcaptures
        => merges work entities if needed
    merge-works: pull all release entities under a single work
        => merges work entities
    merge-containers: merge container entities
    merge-files: merge file entities
"""

import subprocess
from collections import Counter
from typing import Any, Dict, List, Optional

import fatcat_openapi_client

from fatcat_tools.importers import EntityImporter


class EntityMerger(EntityImporter):
    """
    API for individual jobs:

        # record iterators sees
        push_record(raw_record)
        finish()

        # provided helpers
        self.api
        self.get_editgroup_id()
        counts({'lines', 'skip', 'merged', 'updated'})

        # implemented per-task
        try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None
    """

    def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:

        eg_extra = kwargs.get("editgroup_extra", dict())
        eg_extra["git_rev"] = eg_extra.get(
            "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip()
        ).decode("utf-8")
        eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger")

        self.api = api
        self.dry_run_mode = kwargs.get("dry_run_mode", True)
        self.edit_batch_size = kwargs.get("edit_batch_size", 50)
        self.editgroup_description = kwargs.get("editgroup_description")
        self.editgroup_extra = eg_extra
        self.reset()
        self.entity_type_name = "common"

        if self.dry_run_mode:
            print("Running in dry-run mode!")

    def reset(self) -> None:
        self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0})
        self._edit_count = 0
        self._editgroup_id: Optional[str] = None
        self._idents_inflight: List[str] = []

    def push_record(self, record: Dict[str, Any]) -> None:
        """
        Intended to be called by "pusher" class (which could be pulling from
        JSON file, Kafka, whatever).

        Input is expected to be a dict-like object with keys:

            entity_type: str
            primary_id: Optional[str]
            duplicate_ids: [str] (order not preserved)
            evidence: Optional[dict]
                # can be anything, entity- or merger-specific
                # some variables might be...
                extid: str
                extid_type: str

        Returns nothing.
        """
        self.counts["lines"] += 1
        if not record:
            self.counts["skip-blank-line"] += 1
            return
        if record.get("entity_type") != self.entity_type_name:
            self.counts["skip-entity-type"] += 1
            return
        primary_id: Optional[str] = record.get("primary_id")
        duplicate_ids: List[str] = list(set(record["duplicate_ids"]))
        duplicate_ids = [di for di in duplicate_ids if di != primary_id]
        if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id):
            self.counts["skip-no-dupes"] += 1
            return
        all_ids = duplicate_ids
        if primary_id:
            all_ids.append(primary_id)
        for i in all_ids:
            if i in self._idents_inflight:
                raise ValueError(
                    "Entity already part of in-process merge operation: {}".format(i)
                )
            self._idents_inflight.append(i)
        count = self.try_merge(
            duplicate_ids, primary_id=primary_id, evidence=record.get("evidence")
        )
        if count:
            self.counts["merged"] += 1
            self.counts["updated-entities"] += count
            self._edit_count += count
        else:
            self.counts["skip"] += 1
        if self._edit_count >= self.edit_batch_size:
            if not self.dry_run_mode:
                self.api.accept_editgroup(self._editgroup_id)
            self._editgroup_id = None
            self._edit_count = 0
            self._idents_inflight = []
        return

    def try_merge(
        self,
        dupe_ids: List[str],
        primary_id: Optional[str] = None,
        evidence: Optional[Dict[str, Any]] = None,
    ) -> int:
        # implementations should fill this in
        raise NotImplementedError

    def finish(self) -> Counter:
        if self._edit_count > 0:
            if not self.dry_run_mode:
                self.api.accept_editgroup(self._editgroup_id)
            self._editgroup_id = None
            self._edit_count = 0
            self._idents_inflight = []

        return self.counts

    def get_editgroup_id(self, _edits: int = 1) -> str:
        """
        This version of get_editgroup_id() is similar to the EntityImporter
        version, but does not update self._edit_count or submit editgroups. The
        edits parameter is ignored.
        """

        if not self._editgroup_id:
            eg = self.api.create_editgroup(
                fatcat_openapi_client.Editgroup(
                    description=self.editgroup_description, extra=self.editgroup_extra
                )
            )
            self._editgroup_id = eg.editgroup_id
        assert self._editgroup_id
        return self._editgroup_id