aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-17 12:28:19 -0800
committerBryan Newbold <bnewbold@robocracy.org>2021-11-23 17:39:33 -0800
commit717e4d71620093e16bac3ae8932c482ac8b12efa (patch)
treecf0bf0f8d8016bf94a641b6b21b51638277a6db5
parenta00f4c63e7db5b021ded7c6caf6f1b889627568a (diff)
downloadfatcat-717e4d71620093e16bac3ae8932c482ac8b12efa.tar.gz
fatcat-717e4d71620093e16bac3ae8932c482ac8b12efa.zip
mergers: fmt, lint, refactors
These old merger code is from an old branch and needed significant cleanup
-rw-r--r--python/fatcat_tools/mergers/__init__.py4
-rw-r--r--python/fatcat_tools/mergers/common.py129
-rw-r--r--python/fatcat_tools/mergers/releases.py163
3 files changed, 200 insertions, 96 deletions
diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py
index c38a397d..ccc4c02c 100644
--- a/python/fatcat_tools/mergers/__init__.py
+++ b/python/fatcat_tools/mergers/__init__.py
@@ -1,3 +1,3 @@
-
from .common import EntityMerger
-from .releases import ReleaseMerger, ReleaseGrouper
+from .files import FileMerger
+from .releases import ReleaseMerger
diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py
index 62a29c42..1aaf357d 100644
--- a/python/fatcat_tools/mergers/common.py
+++ b/python/fatcat_tools/mergers/common.py
@@ -1,29 +1,25 @@
-
"""
Tools for merging entities in various ways.
- group-releases: pull all release entities under a single work
- => merges work entities
merge-releases: merge release entities together
=> groups files/filesets/webcaptures
+ => merges work entities if needed
+ merge-works: pull all release entities under a single work
+ => merges work entities
merge-containers: merge container entities
merge-files: merge file entities
-
-Input format is JSON lines with keys:
-
- idents (required): array of string identifiers
- primary (optional): single string identifier
-
"""
import subprocess
from collections import Counter
+from typing import Any, Dict, List, Optional
-import fatcat_api_client
-from fatcat_api_client.rest import ApiException
+import fatcat_openapi_client
+from fatcat_tools.importers import EntityImporter
-class EntityMerger:
+
+class EntityMerger(EntityImporter):
"""
API for individual jobs:
@@ -37,67 +33,85 @@ class EntityMerger:
counts({'lines', 'skip', 'merged', 'updated'})
# implemented per-task
- try_merge(idents, primary=None) -> int (entities updated)
+ try_merge(dupe_ids: List[str], primary_id: Optional[str] = None, evidence: Optional[Dict[str, Any]] = None) -> None
This class is pretty similar to EntityImporter, but isn't subclassed.
"""
- def __init__(self, api, **kwargs):
+ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
- eg_extra = kwargs.get('editgroup_extra', dict())
- eg_extra['git_rev'] = eg_extra.get('git_rev',
- subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
- eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger')
+ eg_extra = kwargs.get("editgroup_extra", dict())
+ eg_extra["git_rev"] = eg_extra.get(
+ "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip()
+ ).decode("utf-8")
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityMerger")
self.api = api
- self.dry_run_mode = kwargs.get('dry_run_mode', True)
- self.edit_batch_size = kwargs.get('edit_batch_size', 50)
- self.editgroup_description = kwargs.get('editgroup_description')
+ self.dry_run_mode = kwargs.get("dry_run_mode", True)
+ self.edit_batch_size = kwargs.get("edit_batch_size", 50)
+ self.editgroup_description = kwargs.get("editgroup_description")
self.editgroup_extra = eg_extra
self.reset()
+ self.entity_type_name = "common"
if self.dry_run_mode:
print("Running in dry-run mode!")
- def reset(self):
- self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0})
+ def reset(self) -> None:
+ self.counts = Counter({"lines": 0, "skip": 0, "merged": 0, "updated-total": 0})
self._edit_count = 0
- self._editgroup_id = None
- self._entity_queue = []
- self._idents_inflight = []
+ self._editgroup_id: Optional[str] = None
+ self._idents_inflight: List[str] = []
- def push_record(self, line):
+ def push_record(self, record: Dict[str, Any]) -> None:
"""
Intended to be called by "pusher" class (which could be pulling from
JSON file, Kafka, whatever).
- Input is expected to be a dict-like object with key "idents", and
- optionally "primary".
+ Input is expected to be a dict-like object with keys:
+
+ entity_type: str
+ primary_id: Optional[str]
+ duplicate_ids: [str] (order not preserved)
+ evidence: Optional[dict]
+ # can be anything, entity- or merger-specific
+ # some variables might be...
+ dupe_extid: str
+ dupe_extid_type: str
Returns nothing.
"""
- self.counts['lines'] += 1
- if (not raw_record):
- self.counts['skip'] += 1
+ self.counts["lines"] += 1
+ if not record:
+ self.counts["skip-blank-line"] += 1
+ return
+ if record.get("entity_type") != self.entity_type_name:
+ self.counts["skip-entity-type"] += 1
return
- primary = line.get('primary')
- idents = list(set(line.get('idents')))
- if primary and primary not in idents:
- idents.append(primary)
- if not idents or len(idents) <= 1:
- self.counts['skip'] += 1
+ primary_id: Optional[str] = record.get("primary_id")
+ duplicate_ids: List[str] = list(set(record["duplicate_ids"]))
+ duplicate_ids = [di for di in duplicate_ids if di != primary_id]
+ if not duplicate_ids or (len(duplicate_ids) <= 1 and not primary_id):
+ self.counts["skip-no-dupes"] += 1
return
- for i in idents:
+ all_ids = duplicate_ids
+ if primary_id:
+ all_ids.append(primary_id)
+ for i in all_ids:
if i in self._idents_inflight:
- raise ValueError("Entity already part of in-process merge operation: {}".format(i))
- self._idents.inflight.append(i)
- count = self.try_merge(idents, primary=primary)
+ raise ValueError(
+ "Entity already part of in-process merge operation: {}".format(i)
+ )
+ self._idents_inflight.append(i)
+ count = self.try_merge(
+ duplicate_ids, primary_id=primary_id, evidence=record.get("evidence")
+ )
if count:
- self.counts['merged'] += 1
- self.counts['updated-total'] += count
+ self.counts["merged"] += 1
+ self.counts["updated-entities"] += count
self._edit_count += count
else:
- self.counts['skip'] += 1
+ self.counts["skip"] += 1
if self._edit_count >= self.edit_batch_size:
self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
@@ -105,11 +119,16 @@ class EntityMerger:
self._idents_inflight = []
return
- def try_merge(self, idents, primary=None):
+ def try_merge(
+ self,
+ dupe_ids: List[str],
+ primary_id: Optional[str] = None,
+ evidence: Optional[Dict[str, Any]] = None,
+ ) -> int:
# implementations should fill this in
raise NotImplementedError
- def finish(self):
+ def finish(self) -> Counter:
if self._edit_count > 0:
self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
@@ -118,13 +137,19 @@ class EntityMerger:
return self.counts
- def get_editgroup_id(self):
+ def get_editgroup_id(self, _edits: int = 1) -> str:
+ """
+ This version of get_editgroup_id() is similar to the EntityImporter
+ version, but does not update self._edit_count or submit editgroups. The
+ edits parameter is ignored.
+ """
if not self._editgroup_id:
eg = self.api.create_editgroup(
- fatcat_api_client.Editgroup(
- description=self.editgroup_description,
- extra=self.editgroup_extra))
+ fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description, extra=self.editgroup_extra
+ )
+ )
self._editgroup_id = eg.editgroup_id
-
+ assert self._editgroup_id
return self._editgroup_id
diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py
index 802cb8da..fc970057 100644
--- a/python/fatcat_tools/mergers/releases.py
+++ b/python/fatcat_tools/mergers/releases.py
@@ -1,6 +1,15 @@
+import argparse
+import os
+import sys
+from typing import Any, Dict, List, Optional
+
+import fatcat_openapi_client
+from fatcat_openapi_client.models import ReleaseEntity, WorkEntity
+
+from fatcat_tools import authenticated_api
+from fatcat_tools.importers import JsonLinePusher
from .common import EntityMerger
-from fatcat_api_client.models import ReleaseEntity, WorkEntity
class ReleaseMerger(EntityMerger):
@@ -12,99 +21,169 @@ class ReleaseMerger(EntityMerger):
webcaptures} to the new merged release.
"""
- def __init__(self, api, **kwargs):
+ def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None:
- eg_desc = kwargs.get('editgroup_description',
- "Automated merge of release entities")
- eg_extra = kwargs.get('editgroup_extra', dict())
- eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger')
- super().__init__(api,
- editgroup_description=eg_desc,
- editgroup_extra=eg_extra,
- **kwargs)
+ eg_desc = kwargs.get("editgroup_description", "Automated merge of release entities")
+ eg_extra = kwargs.get("editgroup_extra", dict())
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.ReleaseMerger")
+ super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
+ self.entity_type_name = "release"
- def try_merge(self, idents, primary=None):
+ def try_merge(
+ self,
+ dupe_ids: List[str],
+ primary_id: Optional[str] = None,
+ evidence: Optional[Dict[str, Any]] = None,
+ ) -> int:
+ """
+ XXX: review/refactor; this code is very old
+ """
updated_entities = 0
releases = dict()
eg_id = self.get_editgroup_id()
- self.dry_run_mode
-
- for ident in idents:
- releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures")
+ all_ids = dupe_ids.copy()
+ if primary_id:
+ all_ids.append(primary_id)
+ for ident in all_ids:
+ releases[ident] = self.api.get_release(ident, expand="files,filesets,webcaptures")
- # select the primary (if not already set)
- if not primary:
- primary = releases.keys()[0]
+ if not primary_id:
+ # XXX:
+ primary_id = dupe_ids[0]
+ dupe_ids = [d for d in dupe_ids if d != primary_id]
- primary_work_id = releases[primary].work_id
+ primary_work_id = releases[primary_id].work_id
updated_work_ids = []
redirected_release_ids = []
# execute all the release redirects
for release in releases.values():
- if release.ident == primary:
+ if release.ident == primary_id:
continue
# file redirects
for e in release.files:
e.release_ids.remove(release.ident)
- if not primary in e.release_ids:
- e.release_ids.append(primary)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
if not self.dry_run_mode:
- api.update_file(eg_id, e.ident, e)
+ self.api.update_file(eg_id, e.ident, e)
updated_entities += 1
- self.counts['updated-files'] += 1
+ self.counts["updated-files"] += 1
# fileset redirects
for e in release.filesets:
e.release_ids.remove(release.ident)
- if not primary in e.release_ids:
- e.release_ids.append(primary)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
if not self.dry_run_mode:
- api.update_fileset(eg_id, e.ident, e)
+ self.api.update_fileset(eg_id, e.ident, e)
updated_entities += 1
- self.counts['updated-filesets'] += 1
+ self.counts["updated-filesets"] += 1
# webcapture redirects
for e in release.webcaptures:
e.release_ids.remove(release.ident)
- if not primary in e.release_ids:
- e.release_ids.append(primary)
+ if primary_id not in e.release_ids:
+ e.release_ids.append(primary_id)
if not self.dry_run_mode:
- api.update_webcapture(eg_id, e.ident, e)
+ self.api.update_webcapture(eg_id, e.ident, e)
updated_entities += 1
- self.counts['updated-webcaptures'] += 1
+ self.counts["updated-webcaptures"] += 1
# release redirect itself
updated_work_ids.append(release.work_id)
redirected_release_ids.append(release.ident)
if not self.dry_run_mode:
- api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary))
+ self.api.update_release(
+ eg_id, release.ident, ReleaseEntity(redirect=primary_id)
+ )
updated_entities += 1
- self.counts['updated-releases']
-
+ self.counts["updated-releases"] += 1
# lastly, clean up any merged work entities
- redirected_release_ids = set(redirected_release_ids)
+ redirected_release_ids = list(set(redirected_release_ids))
updated_work_ids = list(set(updated_work_ids))
assert primary_work_id not in updated_work_ids
for work_id in updated_work_ids:
- work_releases = api.get_work_releases(work_id)
+ work_releases = self.api.get_work_releases(work_id)
rids = set([r.ident for r in work_releases])
if rids.issubset(redirected_release_ids):
# all the releases for this work were updated/merged; we should
# redirect to primary work_id
# also check we don't have any in-flight edit conflicts
- assert not work_id in self._idents_inflight
+ assert work_id not in self._idents_inflight
self._idents_inflight.append(work_id)
if not self.dry_run_mode:
- api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id))
+ self.api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id))
updated_entities += 1
- self.counts['updated-works'] += 1
+ self.counts["updated-works"] += 1
return updated_entities
-class ReleaseGrouper(EntityMerger):
- pass
+
+def run_merge_releases(args: argparse.Namespace) -> None:
+ em = ReleaseMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run)
+ JsonLinePusher(em, args.json_file).run()
+
+
+def main() -> None:
+ """
+ Invoke like:
+
+ python3 -m fatcat_tools.mergers.releases [options]
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--host-url", default="http://localhost:9411/v0", help="connect to this host/port"
+ )
+ parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int)
+ parser.add_argument(
+ "--editgroup-description-override",
+ help="editgroup description override",
+ default=None,
+ type=str,
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="don't actually commit merges, just count what would have been",
+ )
+ parser.set_defaults(
+ auth_var="FATCAT_AUTH_API_TOKEN",
+ )
+ subparsers = parser.add_subparsers()
+
+ sub_merge_releases = subparsers.add_parser("merge-releases")
+ sub_merge_releases.set_defaults(func=run_merge_releases)
+ sub_merge_releases.add_argument(
+ "json_file",
+ help="source of merge lines to process (or stdin)",
+ default=sys.stdin,
+ type=argparse.FileType("r"),
+ )
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ # allow editgroup description override via env variable (but CLI arg takes
+ # precedence)
+ if not args.editgroup_description_override and os.environ.get(
+ "FATCAT_EDITGROUP_DESCRIPTION"
+ ):
+ args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION")
+
+ args.api = authenticated_api(
+ args.host_url,
+ # token is an optional kwarg (can be empty string, None, etc)
+ token=os.environ.get(args.auth_var),
+ )
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()