diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-17 12:28:19 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-23 17:39:33 -0800 |
commit | 717e4d71620093e16bac3ae8932c482ac8b12efa (patch) | |
tree | cf0bf0f8d8016bf94a641b6b21b51638277a6db5 /python/fatcat_tools/mergers/common.py | |
parent | a00f4c63e7db5b021ded7c6caf6f1b889627568a (diff) | |
download | fatcat-717e4d71620093e16bac3ae8932c482ac8b12efa.tar.gz fatcat-717e4d71620093e16bac3ae8932c482ac8b12efa.zip |
mergers: fmt, lint, refactors
These old merger code is from an old branch and needed significant
cleanup
Diffstat (limited to 'python/fatcat_tools/mergers/common.py')
-rw-r--r-- | python/fatcat_tools/mergers/common.py | 129 |
1 files changed, 77 insertions, 52 deletions
diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py index 62a29c42..1aaf357d 100644 --- a/python/fatcat_tools/mergers/common.py +++ b/python/fatcat_tools/mergers/common.py @@ -1,29 +1,25 @@ - """ Tools for merging entities in various ways. - group-releases: pull all release entities under a single work - => merges work entities merge-releases: merge release entities together => groups files/filesets/webcaptures + => merges work entities if needed + merge-works: pull all release entities under a single work + => merges work entities merge-containers: merge container entities merge-files: merge file entities - -Input format is JSON lines with keys: - - idents (required): array of string identifiers - primary (optional): single string identifier - """ import subprocess from collections import Counter +from typing import Any, Dict, List, Optional -import fatcat_api_client -from fatcat_api_client.rest import ApiException +import fatcat_openapi_client +from fatcat_tools.importers import EntityImporter -class EntityMerger: + +class EntityMerger(EntityImporter): """ API for individual jobs: @@ -37,67 +33,85 @@ class EntityMerger: counts({'lines', 'skip', 'merged', 'updated'}) # implemented per-task - try_merge(idents, primary=None) -> int (entities updated) + try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None This class is pretty similar to EntityImporter, but isn't subclassed. """ - def __init__(self, api, **kwargs): + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger') + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger") self.api = api - self.dry_run_mode = kwargs.get('dry_run_mode', True) - self.edit_batch_size = kwargs.get('edit_batch_size', 50) - self.editgroup_description = kwargs.get('editgroup_description') + self.dry_run_mode = kwargs.get("dry_run_mode", True) + self.edit_batch_size = kwargs.get("edit_batch_size", 50) + self.editgroup_description = kwargs.get("editgroup_description") self.editgroup_extra = eg_extra self.reset() + self.entity_type_name = "common" if self.dry_run_mode: print("Running in dry-run mode!") - def reset(self): - self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0}) + def reset(self) -> None: + self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0}) self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] - self._idents_inflight = [] + self._editgroup_id: Optional[str] = None + self._idents_inflight: List[str] = [] - def push_record(self, line): + def push_record(self, record: Dict[str, Any]) -> None: """ Intended to be called by "pusher" class (which could be pulling from JSON file, Kafka, whatever). - Input is expected to be a dict-like object with key "idents", and - optionally "primary". + Input is expected to be a dict-like object with keys: + + entity_type: str + primary_id: Optional[str] + duplicate_ids: [str] (order not preserved) + evidence: Optional[dict] + # can be anything, entity- or merger-specific + # some variables might be... + dupe_extid: str + dupe_extid_type: str Returns nothing. """ - self.counts['lines'] += 1 - if (not raw_record): - self.counts['skip'] += 1 + self.counts["lines"] += 1 + if not record: + self.counts["skip-blank-line"] += 1 + return + if record.get("entity_type") != self.entity_type_name: + self.counts["skip-entity-type"] += 1 return - primary = line.get('primary') - idents = list(set(line.get('idents'))) - if primary and primary not in idents: - idents.append(primary) - if not idents or len(idents) <= 1: - self.counts['skip'] += 1 + primary_id: Optional[str] = record.get("primary_id") + duplicate_ids: List[str] = list(set(record["duplicate_ids"])) + duplicate_ids = [di for di in duplicate_ids if di != primary_id] + if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id): + self.counts["skip-no-dupes"] += 1 return - for i in idents: + all_ids = duplicate_ids + if primary_id: + all_ids.append(primary_id) + for i in all_ids: if i in self._idents_inflight: - raise ValueError("Entity already part of in-process merge operation: {}".format(i)) - self._idents.inflight.append(i) - count = self.try_merge(idents, primary=primary) + raise ValueError( + "Entity already part of in-process merge operation: {}".format(i) + ) + self._idents_inflight.append(i) + count = self.try_merge( + duplicate_ids, primary_id=primary_id, evidence=record.get("evidence") + ) if count: - self.counts['merged'] += 1 - self.counts['updated-total'] += count + self.counts["merged"] += 1 + self.counts["updated-entities"] += count self._edit_count += count else: - self.counts['skip'] += 1 + self.counts["skip"] += 1 if self._edit_count >= self.edit_batch_size: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -105,11 +119,16 @@ class EntityMerger: self._idents_inflight = [] return - def try_merge(self, idents, primary=None): + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: # implementations should fill this in raise NotImplementedError - def finish(self): + def finish(self) -> Counter: if self._edit_count > 0: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -118,13 +137,19 @@ class EntityMerger: return self.counts - def get_editgroup_id(self): + def get_editgroup_id(self, _edits: int = 1) -> str: + """ + This version of get_editgroup_id() is similar to the EntityImporter + version, but does not update self._edit_count or submit editgroups. The + edits parameter is ignored. + """ if not self._editgroup_id: eg = self.api.create_editgroup( - fatcat_api_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra)) + fatcat_openapi_client.Editgroup( + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) self._editgroup_id = eg.editgroup_id - + assert self._editgroup_id return self._editgroup_id |