aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/mergers/files.py
blob: 30b3133029adb6de2ae62a8d7273ece1bd5f1eb3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import argparse
import os
import sys
from typing import Any, Dict, List, Optional

import fatcat_openapi_client
from fatcat_openapi_client.models import FileEntity

from fatcat_tools import authenticated_api
from fatcat_tools.importers import JsonLinePusher

from .common import EntityMerger


class FileMerger(EntityMerger):
    """
    Combines file entities into a single primary. Merges any existing partial
    metadata (such as release_ids and URLs). Can choose a primary if necessary.

    The primary is only updated if needed.

    TODO: relies on API server to detect "redirect of redirect" situation
    """

    def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:

        eg_desc = (
            kwargs.pop("editgroup_description", None) or "Automated merge of file entities"
        )
        eg_extra = kwargs.pop("editgroup_extra", dict())
        eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger")
        self.dry_run_mode: bool = eg_extra.get("dry_run_mode", False)
        super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
        self.entity_type_name = "file"

    def choose_primary_file(self, entities: List[FileEntity]) -> str:
        """
        TODO: could incorporate number of redirected entities already pointing at an entity
        """
        assert entities and len(entities) >= 2

        # want to sort in descending order, so reverse=True
        entities = sorted(
            entities,
            key=lambda a: (
                # has complete metadata?
                bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)),
                # has releases associated?
                bool(a.release_ids),
                # has URLs?
                bool(a.urls),
                # has extra metadata?
                bool(a.extra),
                # number of release_ids
                len(a.release_ids or []),
            ),
            reverse=True,
        )
        return entities[0].ident

    def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool:
        """
        Compares a primary to an other. If there are helpful metadata fields in
        the other, copy them to primary, in-place.

        This is intended to extract any useful metadata from "other" before it
        gets redirected to "primary".

        Returns True if the primary was updated, False otherwise.
        """
        updated = False
        # NOTE: intentionally not including sha1 here
        for k in ["size", "mimetype", "sha256", "md5"]:
            if not getattr(primary, k) and getattr(other, k):
                setattr(primary, k, getattr(other, k))
                updated = True

        if not primary.urls:
            primary.urls = []
        if not primary.release_ids:
            primary.release_ids = []

        if other.extra:
            if not primary.extra:
                primary.extra = other.extra
                updated = True
            else:
                for k in other.extra.keys():
                    if k not in primary.extra:
                        primary.extra[k] = other.extra[k]
                        updated = True

        for u in other.urls or []:
            if u not in primary.urls:
                primary.urls.append(u)
                updated = True

        for i in other.release_ids or []:
            if i not in primary.release_ids:
                primary.release_ids.append(i)
                updated = True

        return updated

    def try_merge(
        self,
        dupe_ids: List[str],
        primary_id: Optional[str] = None,
        evidence: Optional[Dict[str, Any]] = None,
    ) -> int:

        # currently required for extid validation
        if not evidence or not (evidence.get("extid_type") and evidence.get("extid")):
            self.counts["skip-missing-evidence"] += 1
            return 0

        updated_entities = 0
        entities: Dict[str, FileEntity] = dict()
        eg_id = self.get_editgroup_id()

        all_ids = dupe_ids.copy()
        if primary_id:
            all_ids.append(primary_id)
        for ident in all_ids:
            try:
                entities[ident] = self.api.get_file(ident)
            except fatcat_openapi_client.ApiException as ae:
                if ae.status == 404:
                    self.counts["skip-entity-not-found"] += 1
                    return 0
                else:
                    raise
            if entities[ident].state != "active":
                self.counts["skip-not-active-entity"] += 1
                return 0
            if getattr(entities[ident], evidence["extid_type"]) != evidence["extid"]:
                self.counts["skip-extid-mismatch"] += 1
                return 0

        if not primary_id:
            primary_id = self.choose_primary_file(list(entities.values()))
            dupe_ids = [d for d in dupe_ids if d != primary_id]

        assert primary_id not in dupe_ids

        primary = entities[primary_id]
        primary_updated = False
        for other_id in dupe_ids:
            other = entities[other_id]
            primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated
            if not self.dry_run_mode:
                self.api.update_file(
                    eg_id,
                    other.ident,
                    FileEntity(
                        redirect=primary.ident,
                        edit_extra=evidence,
                    ),
                )
            updated_entities += 1

        if primary_updated:
            if not self.dry_run_mode:
                self.api.update_file(eg_id, primary.ident, primary)
            updated_entities += 1

        return updated_entities


def run_merge_files(args: argparse.Namespace) -> None:
    em = FileMerger(
        args.api,
        edit_batch_size=args.batch_size,
        dry_run_mode=args.dry_run,
        editgroup_description=args.editgroup_description_override,
    )
    JsonLinePusher(em, args.json_file).run()


def main() -> None:
    """
    Invoke like:

        python3 -m fatcat_tools.mergers.files [options]
    """
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--host-url", default="http://localhost:9411/v0", help="connect to this host/port"
    )
    parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int)
    parser.add_argument(
        "--editgroup-description-override",
        help="editgroup description override",
        default=None,
        type=str,
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="don't actually commit merges, just count what would have been",
    )
    parser.set_defaults(
        auth_var="FATCAT_AUTH_API_TOKEN",
    )
    subparsers = parser.add_subparsers()

    sub_merge_files = subparsers.add_parser("merge-files")
    sub_merge_files.set_defaults(func=run_merge_files)
    sub_merge_files.add_argument(
        "json_file",
        help="source of merge lines to process (or stdin)",
        default=sys.stdin,
        type=argparse.FileType("r"),
    )

    args = parser.parse_args()
    if not args.__dict__.get("func"):
        print("tell me what to do!")
        sys.exit(-1)

    # allow editgroup description override via env variable (but CLI arg takes
    # precedence)
    if not args.editgroup_description_override and os.environ.get(
        "FATCAT_EDITGROUP_DESCRIPTION"
    ):
        args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION")

    args.api = authenticated_api(
        args.host_url,
        # token is an optional kwarg (can be empty string, None, etc)
        token=os.environ.get(args.auth_var),
    )
    args.func(args)


if __name__ == "__main__":
    main()