1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
"""
Tools for merging entities in various ways.
merge-releases: merge release entities together
=> groups files/filesets/webcaptures
=> merges work entities if needed
merge-works: pull all release entities under a single work
=> merges work entities
merge-containers: merge container entities
merge-files: merge file entities
"""
import subprocess
from collections import Counter
from typing import Any, Dict, List, Optional
import fatcat_openapi_client
from fatcat_tools.importers import EntityImporter
class EntityMerger(EntityImporter):
"""
API for individual jobs:
# record iterators sees
push_record(raw_record)
finish()
# provided helpers
self.api
self.get_editgroup_id()
counts({'lines', 'skip', 'merged', 'updated'})
# implemented per-task
try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None
"""
def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
eg_extra = kwargs.get("editgroup_extra", dict())
eg_extra["git_rev"] = eg_extra.get(
"git_rev", subprocess.check_output(["git", "describe", "--always"]).strip()
).decode("utf-8")
eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger")
self.api = api
self.dry_run_mode = kwargs.get("dry_run_mode", True)
self.edit_batch_size = kwargs.get("edit_batch_size", 50)
self.editgroup_description = kwargs.get("editgroup_description")
self.editgroup_extra = eg_extra
self.reset()
self.entity_type_name = "common"
if self.dry_run_mode:
print("Running in dry-run mode!")
def reset(self) -> None:
self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0})
self._edit_count = 0
self._editgroup_id: Optional[str] = None
self._idents_inflight: List[str] = []
def push_record(self, record: Dict[str, Any]) -> None:
"""
Intended to be called by "pusher" class (which could be pulling from
JSON file, Kafka, whatever).
Input is expected to be a dict-like object with keys:
entity_type: str
primary_id: Optional[str]
duplicate_ids: [str] (order not preserved)
evidence: Optional[dict]
# can be anything, entity- or merger-specific
# some variables might be...
extid: str
extid_type: str
Returns nothing.
"""
self.counts["lines"] += 1
if not record:
self.counts["skip-blank-line"] += 1
return
if record.get("entity_type") != self.entity_type_name:
self.counts["skip-entity-type"] += 1
return
primary_id: Optional[str] = record.get("primary_id")
duplicate_ids: List[str] = list(set(record["duplicate_ids"]))
duplicate_ids = [di for di in duplicate_ids if di != primary_id]
if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id):
self.counts["skip-no-dupes"] += 1
return
all_ids = duplicate_ids
if primary_id:
all_ids.append(primary_id)
for i in all_ids:
if i in self._idents_inflight:
raise ValueError(
"Entity already part of in-process merge operation: {}".format(i)
)
self._idents_inflight.append(i)
count = self.try_merge(
duplicate_ids, primary_id=primary_id, evidence=record.get("evidence")
)
if count:
self.counts["merged"] += 1
self.counts["updated-entities"] += count
self._edit_count += count
else:
self.counts["skip"] += 1
if self._edit_count >= self.edit_batch_size:
if not self.dry_run_mode:
self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
self._edit_count = 0
self._idents_inflight = []
return
def try_merge(
self,
dupe_ids: List[str],
primary_id: Optional[str] = None,
evidence: Optional[Dict[str, Any]] = None,
) -> int:
# implementations should fill this in
raise NotImplementedError
def finish(self) -> Counter:
if self._edit_count > 0:
if not self.dry_run_mode:
self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
self._edit_count = 0
self._idents_inflight = []
return self.counts
def get_editgroup_id(self, _edits: int = 1) -> str:
"""
This version of get_editgroup_id() is similar to the EntityImporter
version, but does not update self._edit_count or submit editgroups. The
edits parameter is ignored.
"""
if not self._editgroup_id:
eg = self.api.create_editgroup(
fatcat_openapi_client.Editgroup(
description=self.editgroup_description, extra=self.editgroup_extra
)
)
self._editgroup_id = eg.editgroup_id
assert self._editgroup_id
return self._editgroup_id
|