diff options
author | bnewbold <bnewbold@archive.org> | 2021-11-25 00:36:34 +0000 |
---|---|---|
committer | bnewbold <bnewbold@archive.org> | 2021-11-25 00:36:34 +0000 |
commit | 5bc5eeed5e3ba54c2129c4233b881291c5fa7449 (patch) | |
tree | 88392dace6857836cab80f3ea6d0980c20a24376 /python/fatcat_tools/mergers/common.py | |
parent | b14fca89f41b5ba2b85bf033844da211fa5c3c8b (diff) | |
parent | 62bf6202d002e96ffd81d7e4634502be1886d5c3 (diff) | |
download | fatcat-5bc5eeed5e3ba54c2129c4233b881291c5fa7449.tar.gz fatcat-5bc5eeed5e3ba54c2129c4233b881291c5fa7449.zip |
Merge branch 'bnewbold-mergers' into 'master'
entity mergers framework
See merge request webgroup/fatcat!133
Diffstat (limited to 'python/fatcat_tools/mergers/common.py')
-rw-r--r-- | python/fatcat_tools/mergers/common.py | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py new file mode 100644 index 00000000..e25f8194 --- /dev/null +++ b/python/fatcat_tools/mergers/common.py @@ -0,0 +1,153 @@ +""" +Tools for merging entities in various ways. + + merge-releases: merge release entities together + => groups files/filesets/webcaptures + => merges work entities if needed + merge-works: pull all release entities under a single work + => merges work entities + merge-containers: merge container entities + merge-files: merge file entities +""" + +import subprocess +from collections import Counter +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client + +from fatcat_tools.importers import EntityImporter + + +class EntityMerger(EntityImporter): + """ + API for individual jobs: + + # record iterators sees + push_record(raw_record) + finish() + + # provided helpers + self.api + self.get_editgroup_id() + counts({'lines', 'skip', 'merged', 'updated'}) + + # implemented per-task + try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger") + + self.api = api + self.dry_run_mode = kwargs.get("dry_run_mode", True) + self.edit_batch_size = kwargs.get("edit_batch_size", 50) + self.editgroup_description = kwargs.get("editgroup_description") + self.editgroup_extra = eg_extra + self.reset() + self.entity_type_name = "common" + + if self.dry_run_mode: + print("Running in dry-run mode!") + + def reset(self) -> None: + self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0}) + self._edit_count = 0 + self._editgroup_id: Optional[str] = None + self._idents_inflight: List[str] = [] + + def push_record(self, record: Dict[str, Any]) -> None: + """ + Intended to be called by "pusher" class (which could be pulling from + JSON file, Kafka, whatever). + + Input is expected to be a dict-like object with keys: + + entity_type: str + primary_id: Optional[str] + duplicate_ids: [str] (order not preserved) + evidence: Optional[dict] + # can be anything, entity- or merger-specific + # some variables might be... + extid: str + extid_type: str + + Returns nothing. + """ + self.counts["lines"] += 1 + if not record: + self.counts["skip-blank-line"] += 1 + return + if record.get("entity_type") != self.entity_type_name: + self.counts["skip-entity-type"] += 1 + return + primary_id: Optional[str] = record.get("primary_id") + duplicate_ids: List[str] = list(set(record["duplicate_ids"])) + duplicate_ids = [di for di in duplicate_ids if di != primary_id] + if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id): + self.counts["skip-no-dupes"] += 1 + return + all_ids = duplicate_ids + if primary_id: + all_ids.append(primary_id) + for i in all_ids: + if i in self._idents_inflight: + raise ValueError( + "Entity already part of in-process merge operation: {}".format(i) + ) + self._idents_inflight.append(i) + count = self.try_merge( + duplicate_ids, primary_id=primary_id, evidence=record.get("evidence") + ) + if count: + self.counts["merged"] += 1 + self.counts["updated-entities"] += count + self._edit_count += count + else: + self.counts["skip"] += 1 + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + return + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + # implementations should fill this in + raise NotImplementedError + + def finish(self) -> Counter: + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + + return self.counts + + def get_editgroup_id(self, _edits: int = 1) -> str: + """ + This version of get_editgroup_id() is similar to the EntityImporter + version, but does not update self._edit_count or submit editgroups. The + edits parameter is ignored. + """ + + if not self._editgroup_id: + eg = self.api.create_editgroup( + fatcat_openapi_client.Editgroup( + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) + self._editgroup_id = eg.editgroup_id + assert self._editgroup_id + return self._editgroup_id |