1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
import argparse
import os
import sys
from typing import Any, Dict, List, Optional
import fatcat_openapi_client
from fatcat_openapi_client.models import FileEntity
from fatcat_tools import authenticated_api
from fatcat_tools.importers import JsonLinePusher
from .common import EntityMerger
class FileMerger(EntityMerger):
"""
Combines file entities into a single primary. Merges any existing partial
metadata (such as release_ids and URLs). Can choose a primary if necessary.
The primary is only updated if needed.
TODO: relies on API server to detect "redirect of redirect" situation
"""
def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
eg_desc = (
kwargs.pop("editgroup_description", None) or "Automated merge of file entities"
)
eg_extra = kwargs.pop("editgroup_extra", dict())
eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger")
self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False)
super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
self.entity_type_name = "file"
def choose_primary_file(self, entities: List[FileEntity]) -> str:
"""
TODO: could incorporate number of redirected entities already pointing at an entity
"""
assert entities and len(entities) >= 2
# want to sort in descending order, so reverse=True
entities = sorted(
entities,
key=lambda a: (
# has complete metadata?
bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)),
# has releases associated?
bool(a.release_ids),
# has URLs?
bool(a.urls),
# has extra metadata?
bool(a.extra),
# number of release_ids
len(a.release_ids or []),
),
reverse=True,
)
return entities[0].ident
def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool:
"""
Compares a primary to an other. If there are helpful metadata fields in
the other, copy them to primary, in-place.
This is intended to extract any useful metadata from "other" before it
gets redirected to "primary".
Returns True if the primary was updated, False otherwise.
"""
updated = False
# NOTE: intentionally not including sha1 here
for k in ["size", "mimetype", "sha256", "md5"]:
if not getattr(primary, k) and getattr(other, k):
setattr(primary, k, getattr(other, k))
updated = True
if not primary.urls:
primary.urls = []
if not primary.release_ids:
primary.release_ids = []
if other.extra:
if not primary.extra:
primary.extra = other.extra
updated = True
else:
for k in other.extra.keys():
if k not in primary.extra:
primary.extra[k] = other.extra[k]
updated = True
for u in other.urls or []:
if u not in primary.urls:
primary.urls.append(u)
updated = True
for i in other.release_ids or []:
if i not in primary.release_ids:
primary.release_ids.append(i)
updated = True
return updated
def try_merge(
self,
dupe_ids: List[str],
primary_id: Optional[str] = None,
evidence: Optional[Dict[str, Any]] = None,
) -> int:
# currently required for extid validation
if not evidence or not (evidence.get("extid_type") and evidence.get("extid")):
self.counts["skip-missing-evidence"] += 1
return 0
updated_entities = 0
entities: Dict[str, FileEntity] = dict()
eg_id = self.get_editgroup_id()
all_ids = dupe_ids.copy()
if primary_id:
all_ids.append(primary_id)
for ident in all_ids:
try:
entities[ident] = self.api.get_file(ident)
except fatcat_openapi_client.ApiException as ae:
if ae.status == 404:
self.counts["skip-entity-not-found"] += 1
return 0
else:
raise
if entities[ident].state != "active":
self.counts["skip-not-active-entity"] += 1
return 0
if getattr(entities[ident], evidence["extid_type"]) != evidence["extid"]:
self.counts["skip-extid-mismatch"] += 1
return 0
if not primary_id:
primary_id = self.choose_primary_file(list(entities.values()))
dupe_ids = [d for d in dupe_ids if d != primary_id]
assert primary_id not in dupe_ids
primary = entities[primary_id]
primary_updated = False
for other_id in dupe_ids:
other = entities[other_id]
primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated
if not self.dry_run_mode:
self.api.update_file(
eg_id,
other.ident,
FileEntity(
redirect=primary.ident,
edit_extra=evidence,
),
)
updated_entities += 1
if primary_updated:
if not self.dry_run_mode:
self.api.update_file(eg_id, primary.ident, primary)
updated_entities += 1
return updated_entities
def run_merge_files(args: argparse.Namespace) -> None:
em = FileMerger(
args.api,
edit_batch_size=args.batch_size,
dry_run_mode=args.dry_run,
editgroup_description=args.editgroup_description_override,
)
JsonLinePusher(em, args.json_file).run()
def main() -> None:
"""
Invoke like:
python3 -m fatcat_tools.mergers.files [options]
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--host-url", default="http://localhost:9411/v0", help="connect to this host/port"
)
parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int)
parser.add_argument(
"--editgroup-description-override",
help="editgroup description override",
default=None,
type=str,
)
parser.add_argument(
"--dry-run",
action="store_true",
help="don't actually commit merges, just count what would have been",
)
parser.set_defaults(
auth_var="FATCAT_AUTH_API_TOKEN",
)
subparsers = parser.add_subparsers()
sub_merge_files = subparsers.add_parser("merge-files")
sub_merge_files.set_defaults(func=run_merge_files)
sub_merge_files.add_argument(
"json_file",
help="source of merge lines to process (or stdin)",
default=sys.stdin,
type=argparse.FileType("r"),
)
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")
sys.exit(-1)
# allow editgroup description override via env variable (but CLI arg takes
# precedence)
if not args.editgroup_description_override and os.environ.get(
"FATCAT_EDITGROUP_DESCRIPTION"
):
args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION")
args.api = authenticated_api(
args.host_url,
# token is an optional kwarg (can be empty string, None, etc)
token=os.environ.get(args.auth_var),
)
args.func(args)
if __name__ == "__main__":
main()
|