summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/cleanups/common.py
blob: c8ca5800f5bf5e61f0bae11845f43ed2e717c442 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import copy
import json
import subprocess
import sys
from collections import Counter
from typing import Any, Dict, List, Sequence

from fatcat_openapi_client import ApiClient, Editgroup

from fatcat_tools.transforms import entity_from_dict, entity_to_dict


class EntityCleaner:
    """
    API for individual jobs:

        # record iterators sees
        push_record(record)
        finish()

        # provided helpers
        self.api
        self.get_editgroup_id()
        counts({'lines', 'skip', 'merged', 'updated'})

        # implemented per-task
        try_merge(idents, primary=None) -> int (entities updated)

    This class is pretty similar to EntityImporter, but isn't subclassed.
    """

    def __init__(self, api: ApiClient, entity_type: Any, **kwargs) -> None:

        eg_extra = kwargs.get("editgroup_extra", dict())
        eg_extra["git_rev"] = eg_extra.get(
            "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip()
        ).decode("utf-8")
        eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityCleaner")

        self.api = api
        self.entity_type = entity_type
        self.dry_run_mode = kwargs.get("dry_run_mode", True)
        self.edit_batch_size = kwargs.get("edit_batch_size", 50)
        self.editgroup_description = kwargs.get(
            "editgroup_description", "Generic Entity Cleaner Bot"
        )
        self.editgroup_extra = eg_extra
        self.reset()
        self.ac = ApiClient()

        if self.dry_run_mode:
            print("Running in dry-run mode!")

    def reset(self) -> None:
        self.counts = Counter({"lines": 0, "cleaned": 0, "updated": 0})
        self._edit_count = 0
        self._editgroup_id = None
        self._entity_queue: List[Any] = []
        self._idents_inflight: List[str] = []

    def push_record(self, record: Dict[str, Any]) -> None:
        """
        Intended to be called by "pusher" class (which could be pulling from
        JSON file, Kafka, whatever).

        Input is expected to be an entity in JSON-like dict form.

        Returns nothing.
        """
        self.counts["lines"] += 1
        if not record:
            self.counts["skip-null"] += 1
            return

        entity = entity_from_dict(record, self.entity_type, api_client=self.ac)

        if entity.state != "active":
            self.counts["skip-inactive"] += 1
            return

        cleaned = self.clean_entity(copy.deepcopy(entity))
        if entity == cleaned:
            self.counts["skip-clean"] += 1
            return
        else:
            self.counts["cleaned"] += 1

        if self.dry_run_mode:
            entity_dict = entity_to_dict(entity, api_client=self.ac)
            print(json.dumps(entity_dict))
            return

        if entity.ident in self._idents_inflight:
            raise ValueError(
                "Entity already part of in-process update: {}".format(entity.ident)
            )

        updated = self.try_update(cleaned)
        if updated:
            self.counts["updated"] += updated
            self._edit_count += updated
            self._idents_inflight.append(entity.ident)

        if self._edit_count >= self.edit_batch_size:
            self.api.accept_editgroup(self._editgroup_id)
            self._editgroup_id = None
            self._edit_count = 0
            self._idents_inflight = []
        return

    def clean_entity(self, entity: Any) -> Any:
        """
        Mutates entity in-place and returns it
        """
        # implementations should fill this in
        raise NotImplementedError

    def try_update(self, entity: Any) -> int:
        """
        Returns edit count (number of entities updated).

        If >= 1, does not need to update self.counts. If no entities updated,
        do need to update counts internally.
        """
        # implementations should fill this in
        raise NotImplementedError

    def finish(self) -> Counter:
        if self._edit_count > 0:
            self.api.accept_editgroup(self._editgroup_id)
            self._editgroup_id = None
            self._edit_count = 0
            self._idents_inflight = []

        return self.counts

    def get_editgroup_id(self) -> str:

        if not self._editgroup_id:
            eg = self.api.create_editgroup(
                Editgroup(description=self.editgroup_description, extra=self.editgroup_extra)
            )
            self._editgroup_id = eg.editgroup_id

        assert self._editgroup_id
        return self._editgroup_id


class JsonLinePusher:
    def __init__(self, cleaner: EntityCleaner, json_file: Sequence, **kwargs) -> None:
        self.cleaner = cleaner
        self.json_file = json_file

    def run(self) -> Counter:
        for line in self.json_file:
            if not line:
                continue
            record = json.loads(line)
            self.cleaner.push_record(record)
        counts = self.cleaner.finish()
        print(counts, file=sys.stderr)
        return counts