aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/mergers
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-09-05 18:41:30 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-23 17:39:33 -0800
commit8b132461c9e9f85e61c520f2f576144a6f6e06ac (patch)
tree5df6687878c03daecf4aa02da0ab8f620e85d1b1 /python/fatcat_tools/mergers
parent802bad86a61cb0d92c83d9114844a54a83e83b94 (diff)
downloadfatcat-8b132461c9e9f85e61c520f2f576144a6f6e06ac.tar.gz
fatcat-8b132461c9e9f85e61c520f2f576144a6f6e06ac.zip
first iteration of mergers
Diffstat (limited to 'python/fatcat_tools/mergers')
-rw-r--r--python/fatcat_tools/mergers/__init__.py3
-rw-r--r--python/fatcat_tools/mergers/common.py130
-rw-r--r--python/fatcat_tools/mergers/releases.py110
3 files changed, 243 insertions, 0 deletions
diff --git a/python/fatcat_tools/mergers/__init__.py b/python/fatcat_tools/mergers/__init__.py
new file mode 100644
index 00000000..c38a397d
--- /dev/null
+++ b/python/fatcat_tools/mergers/__init__.py
@@ -0,0 +1,3 @@
+
+from .common import EntityMerger
+from .releases import ReleaseMerger, ReleaseGrouper
diff --git a/python/fatcat_tools/mergers/common.py b/python/fatcat_tools/mergers/common.py
new file mode 100644
index 00000000..62a29c42
--- /dev/null
+++ b/python/fatcat_tools/mergers/common.py
@@ -0,0 +1,130 @@
+
+"""
+Tools for merging entities in various ways.
+
+ group-releases: pull all release entities under a single work
+ => merges work entities
+ merge-releases: merge release entities together
+ => groups files/filesets/webcaptures
+ merge-containers: merge container entities
+ merge-files: merge file entities
+
+Input format is JSON lines with keys:
+
+ idents (required): array of string identifiers
+ primary (optional): single string identifier
+
+"""
+
+import subprocess
+from collections import Counter
+
+import fatcat_api_client
+from fatcat_api_client.rest import ApiException
+
+
+class EntityMerger:
+ """
+ API for individual jobs:
+
+ # record iterators sees
+ push_record(raw_record)
+ finish()
+
+ # provided helpers
+ self.api
+ self.get_editgroup_id()
+ counts({'lines', 'skip', 'merged', 'updated'})
+
+ # implemented per-task
+ try_merge(idents, primary=None) -> int (entities updated)
+
+ This class is pretty similar to EntityImporter, but isn't subclassed.
+ """
+
+ def __init__(self, api, **kwargs):
+
+ eg_extra = kwargs.get('editgroup_extra', dict())
+ eg_extra['git_rev'] = eg_extra.get('git_rev',
+ subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityMerger')
+
+ self.api = api
+ self.dry_run_mode = kwargs.get('dry_run_mode', True)
+ self.edit_batch_size = kwargs.get('edit_batch_size', 50)
+ self.editgroup_description = kwargs.get('editgroup_description')
+ self.editgroup_extra = eg_extra
+ self.reset()
+
+ if self.dry_run_mode:
+ print("Running in dry-run mode!")
+
+ def reset(self):
+ self.counts = Counter({'lines': 0, 'skip': 0, 'merged': 0, 'updated-total': 0})
+ self._edit_count = 0
+ self._editgroup_id = None
+ self._entity_queue = []
+ self._idents_inflight = []
+
+ def push_record(self, line):
+ """
+ Intended to be called by "pusher" class (which could be pulling from
+ JSON file, Kafka, whatever).
+
+ Input is expected to be a dict-like object with key "idents", and
+ optionally "primary".
+
+ Returns nothing.
+ """
+ self.counts['lines'] += 1
+ if (not raw_record):
+ self.counts['skip'] += 1
+ return
+ primary = line.get('primary')
+ idents = list(set(line.get('idents')))
+ if primary and primary not in idents:
+ idents.append(primary)
+ if not idents or len(idents) <= 1:
+ self.counts['skip'] += 1
+ return
+ for i in idents:
+ if i in self._idents_inflight:
+ raise ValueError("Entity already part of in-process merge operation: {}".format(i))
+ self._idents.inflight.append(i)
+ count = self.try_merge(idents, primary=primary)
+ if count:
+ self.counts['merged'] += 1
+ self.counts['updated-total'] += count
+ self._edit_count += count
+ else:
+ self.counts['skip'] += 1
+ if self._edit_count >= self.edit_batch_size:
+ self.api.accept_editgroup(self._editgroup_id)
+ self._editgroup_id = None
+ self._edit_count = 0
+ self._idents_inflight = []
+ return
+
+ def try_merge(self, idents, primary=None):
+ # implementations should fill this in
+ raise NotImplementedError
+
+ def finish(self):
+ if self._edit_count > 0:
+ self.api.accept_editgroup(self._editgroup_id)
+ self._editgroup_id = None
+ self._edit_count = 0
+ self._idents_inflight = []
+
+ return self.counts
+
+ def get_editgroup_id(self):
+
+ if not self._editgroup_id:
+ eg = self.api.create_editgroup(
+ fatcat_api_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra))
+ self._editgroup_id = eg.editgroup_id
+
+ return self._editgroup_id
diff --git a/python/fatcat_tools/mergers/releases.py b/python/fatcat_tools/mergers/releases.py
new file mode 100644
index 00000000..802cb8da
--- /dev/null
+++ b/python/fatcat_tools/mergers/releases.py
@@ -0,0 +1,110 @@
+
+from .common import EntityMerger
+from fatcat_api_client.models import ReleaseEntity, WorkEntity
+
+
+class ReleaseMerger(EntityMerger):
+ """
+ Hard merges a set of release entities, redirecting all entities to a single
+ primary release.
+
+ Will also redirect works (if appropriate), and re-point {files, filesets,
+ webcaptures} to the new merged release.
+ """
+
+ def __init__(self, api, **kwargs):
+
+ eg_desc = kwargs.get('editgroup_description',
+ "Automated merge of release entities")
+ eg_extra = kwargs.get('editgroup_extra', dict())
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ReleaseMerger')
+ super().__init__(api,
+ editgroup_description=eg_desc,
+ editgroup_extra=eg_extra,
+ **kwargs)
+
+ def try_merge(self, idents, primary=None):
+
+ updated_entities = 0
+ releases = dict()
+ eg_id = self.get_editgroup_id()
+
+ self.dry_run_mode
+
+ for ident in idents:
+ releases[ident] = api.get_release(ident, expand="files,filesets,webcaptures")
+
+ # select the primary (if not already set)
+ if not primary:
+ primary = releases.keys()[0]
+
+ primary_work_id = releases[primary].work_id
+ updated_work_ids = []
+ redirected_release_ids = []
+
+ # execute all the release redirects
+ for release in releases.values():
+ if release.ident == primary:
+ continue
+
+ # file redirects
+ for e in release.files:
+ e.release_ids.remove(release.ident)
+ if not primary in e.release_ids:
+ e.release_ids.append(primary)
+ if not self.dry_run_mode:
+ api.update_file(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts['updated-files'] += 1
+
+ # fileset redirects
+ for e in release.filesets:
+ e.release_ids.remove(release.ident)
+ if not primary in e.release_ids:
+ e.release_ids.append(primary)
+ if not self.dry_run_mode:
+ api.update_fileset(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts['updated-filesets'] += 1
+
+ # webcapture redirects
+ for e in release.webcaptures:
+ e.release_ids.remove(release.ident)
+ if not primary in e.release_ids:
+ e.release_ids.append(primary)
+ if not self.dry_run_mode:
+ api.update_webcapture(eg_id, e.ident, e)
+ updated_entities += 1
+ self.counts['updated-webcaptures'] += 1
+
+ # release redirect itself
+ updated_work_ids.append(release.work_id)
+ redirected_release_ids.append(release.ident)
+ if not self.dry_run_mode:
+ api.update_release(eg_id, release.ident, ReleaseEntity(redirect=primary))
+ updated_entities += 1
+ self.counts['updated-releases']
+
+
+ # lastly, clean up any merged work entities
+ redirected_release_ids = set(redirected_release_ids)
+ updated_work_ids = list(set(updated_work_ids))
+ assert primary_work_id not in updated_work_ids
+ for work_id in updated_work_ids:
+ work_releases = api.get_work_releases(work_id)
+ rids = set([r.ident for r in work_releases])
+ if rids.issubset(redirected_release_ids):
+ # all the releases for this work were updated/merged; we should
+ # redirect to primary work_id
+ # also check we don't have any in-flight edit conflicts
+ assert not work_id in self._idents_inflight
+ self._idents_inflight.append(work_id)
+ if not self.dry_run_mode:
+ api.update_work(eg_id, work_id, WorkEntity(redirect=primary_work_id))
+ updated_entities += 1
+ self.counts['updated-works'] += 1
+
+ return updated_entities
+
+class ReleaseGrouper(EntityMerger):
+ pass