From f12bde00c34abf1d4a1604a76cac033b3c4c864b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Nov 2021 12:36:00 -0800 Subject: initial file merger, with tests --- python/fatcat_tools/mergers/files.py | 228 +++++++++++++++++++++++++++++++++++ python/tests/merge_files.py | 160 ++++++++++++++++++++++++ 2 files changed, 388 insertions(+) create mode 100644 python/fatcat_tools/mergers/files.py create mode 100644 python/tests/merge_files.py diff --git a/python/fatcat_tools/mergers/files.py b/python/fatcat_tools/mergers/files.py new file mode 100644 index 00000000..4bc8bb81 --- /dev/null +++ b/python/fatcat_tools/mergers/files.py @@ -0,0 +1,228 @@ +import argparse +import os +import sys +from typing import Any, Dict, List, Optional + +import fatcat_openapi_client +from fatcat_openapi_client.models import FileEntity + +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher + +from .common import EntityMerger + + +class FileMerger(EntityMerger): + """ + Combines file entities into a single primary. Merges any existing partial + metadata (such as release_ids and URLs). Can choose a primary if necessary. + + The primary is only updated if needed. + + TODO: relies on API server to detect "redirect of redirect" situation + """ + + def __init__(self, api: fatcat_openapi_client.ApiClient, **kwargs) -> None: + + eg_desc = kwargs.get("editgroup_description", "Automated merge of file entities") + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.FileMerger") + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.entity_type_name = "file" + + def choose_primary_file(self, entities: List[FileEntity]) -> str: + """ + TODO: could incorporate number of redirected entities already pointing at an entity + """ + assert entities and len(entities) >= 2 + + # want to sort in descending order, so reverse=True + entities = sorted( + entities, + key=lambda a: ( + # has complete metadata? + bool(a.sha256 and a.md5 and a.sha1 and (a.size is not None)), + # has releases associated? + bool(a.release_ids), + # has URLs? + bool(a.urls), + # has extra metadata? + bool(a.extra), + # number of release_ids + len(a.release_ids or []), + ), + reverse=True, + ) + return entities[0].ident + + def merge_file_metadata_from(self, primary: FileEntity, other: FileEntity) -> bool: + """ + Compares a primary to an other. If there are helpful metadata fields in + the other, copy them to primary, in-place. + + This is intended to extract any useful metadata from "other" before it + gets redirected to "primary". + + Returns True if the primary was updated, False otherwise. + """ + updated = False + # NOTE: intentionally not including sha1 here + for k in ["size", "mimetype", "sha256", "md5"]: + if not getattr(primary, k) and getattr(other, k): + setattr(primary, k, getattr(other, k)) + updated = True + + if not primary.urls: + primary.urls = [] + if not primary.release_ids: + primary.release_ids = [] + + if other.extra: + if not primary.extra: + primary.extra = other.extra + updated = True + else: + for k in other.extra.keys(): + if k not in primary.extra: + primary.extra[k] = other.extra[k] + updated = True + + for u in other.urls or []: + if u not in primary.urls: + primary.urls.append(u) + updated = True + + for i in other.release_ids or []: + if i not in primary.release_ids: + primary.release_ids.append(i) + updated = True + + return updated + + def try_merge( + self, + dupe_ids: List[str], + primary_id: Optional[str] = None, + evidence: Optional[Dict[str, Any]] = None, + ) -> int: + + # currently requires for extid validation + if not evidence or not (evidence.get("extid_type") and evidence.get("extid")): + self.counts["skip-missing-evidence"] += 1 + return 0 + + updated_entities = 0 + entities: Dict[str, FileEntity] = dict() + eg_id = self.get_editgroup_id() + + all_ids = dupe_ids.copy() + if primary_id: + all_ids.append(primary_id) + for ident in all_ids: + try: + entities[ident] = self.api.get_file(ident) + except fatcat_openapi_client.ApiException as ae: + if ae.status == 404: + self.counts["skip-entity-not-found"] += 1 + return 0 + else: + raise + if entities[ident].state != "active": + self.counts["skip-not-active-entity"] += 1 + return 0 + if getattr(entities[ident].ext_ids, evidence["extid_type"]) != evidence["extid"]: + self.counts["skip-extid-mismatch"] += 1 + return 0 + + if not primary_id: + primary_id = self.choose_primary_file(list(entities.values())) + dupe_ids = [d for d in dupe_ids if d != primary_id] + + # ensure primary is not in dupes + assert primary_id not in dupe_ids + + primary = entities[primary_id] + primary_updated = False + for other_id in dupe_ids: + other = entities[other_id] + primary_updated = self.merge_file_metadata_from(primary, other) or primary_updated + self.api.update_file( + eg_id, + other.ident, + FileEntity( + redirect=primary.ident, + edit_extra=evidence, + ), + ) + updated_entities += 1 + + if primary_updated: + self.api.update_file(eg_id, primary.ident, primary) + updated_entities += 1 + + return updated_entities + + +def run_merge_files(args: argparse.Namespace) -> None: + em = FileMerger(args.api, edit_batch_size=args.batch_size, dry_run_mode=args.dry_run) + JsonLinePusher(em, args.json_file).run() + + +def main() -> None: + """ + Invoke like: + + python3 -m fatcat_tools.mergers.files [options] + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument("--batch-size", help="size of batch to send", default=50, type=int) + parser.add_argument( + "--editgroup-description-override", + help="editgroup description override", + default=None, + type=str, + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="don't actually commit merges, just count what would have been", + ) + parser.set_defaults( + auth_var="FATCAT_AUTH_API_TOKEN", + ) + subparsers = parser.add_subparsers() + + sub_merge_files = subparsers.add_parser("merge-files") + sub_merge_files.set_defaults(func=run_merge_files) + sub_merge_files.add_argument( + "json_file", + help="source of merge lines to process (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override and os.environ.get( + "FATCAT_EDITGROUP_DESCRIPTION" + ): + args.editgroup_description_override = os.environ.get("FATCAT_EDITGROUP_DESCRIPTION") + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var), + ) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/python/tests/merge_files.py b/python/tests/merge_files.py new file mode 100644 index 00000000..c33c6f6c --- /dev/null +++ b/python/tests/merge_files.py @@ -0,0 +1,160 @@ +from fatcat_openapi_client import FileEntity, FileUrl +from fixtures import api + +from fatcat_tools.mergers.files import FileMerger + + +def test_choose_primary_file(api) -> None: + + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + + assert fm.choose_primary_file([fe_partial, fe_norelease]) == "bbbbsb5apzfhbbxxc7rgu2yw6m" + assert ( + fm.choose_primary_file([fe_partial, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_complete, fe_nourls, fe_norelease]) + == "ddddsb5apzfhbbxxc7rgu2yw6m" + ) + assert ( + fm.choose_primary_file([fe_partial, fe_pseudo_complete, fe_nourls, fe_norelease]) + == "ccccsb5apzfhbbxxc7rgu2yw6m" + ) + + +def test_merge_file_metadata_from(api) -> None: + fm = FileMerger(api=api) + fe_partial = FileEntity( + ident="aaaasb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3aaaaaa", + ) + fe_norelease = FileEntity( + ident="bbbbsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + ) + fe_nourls = FileEntity( + ident="ccccsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="d2c7318315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + ) + fe_complete = FileEntity( + ident="ddddsb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + md5="ddddddd315bfc7d3aab0db933e95e632", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_pseudo_complete = FileEntity( + ident="eeeesb5apzfhbbxxc7rgu2yw6m", + sha1="b1beebb5f979121cd234c69b08e3f42af3bbbbbb", + sha256="528064c7664a96c79c80c423210f6f9f4fafe949dd59dfd1572a04b906d5e163", + size=60719, + mimetype="application/pdf", + release_ids=["dlrxjg7mxrayxfltget7fqcrjy"], + urls=[ + FileUrl(rel="web", url="http://aughty.org/pdf/future_open.pdf"), + ], + extra=dict(asdf=123), + ) + fe_another_release_id = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + ) + fe_another_url = FileEntity( + ident="zzzzzzzapzfhbbxxc7rgu2yw6m", + urls=[ + FileUrl(rel="repository", url="http://someuni.edu/repo/file.pdf"), + ], + ) + fe_more_extra = FileEntity( + ident="fffffffapzfhbbxxc7rgu2yw6m", + release_ids=["qqqqqg7mxrayxfltget7fqcrjy"], + extra=dict(thang=456), + ) + + assert fm.merge_file_metadata_from(fe_nourls, fe_partial) is False + assert fm.merge_file_metadata_from(fe_complete, fe_pseudo_complete) is False + assert fm.merge_file_metadata_from(fe_complete, fe_complete) is False + assert fm.merge_file_metadata_from(fe_partial, fe_norelease) is True + assert fe_partial.md5 == fe_norelease.md5 + assert fe_partial.size == fe_norelease.size + assert fm.merge_file_metadata_from(fe_partial, fe_complete) is True + assert fe_partial.md5 != fe_complete.md5 + assert fe_partial.extra == fe_complete.extra + assert set([(u.rel, u.url) for u in fe_partial.urls or []]) == set( + [(u.rel, u.url) for u in fe_complete.urls or []] + ) + assert fe_partial.release_ids == fe_complete.release_ids + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is True + assert fe_partial.release_ids == [ + "dlrxjg7mxrayxfltget7fqcrjy", + "qqqqqg7mxrayxfltget7fqcrjy", + ] + assert fm.merge_file_metadata_from(fe_partial, fe_another_release_id) is False + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is True + assert fe_partial.extra == dict(asdf=123, thang=456) + assert fm.merge_file_metadata_from(fe_partial, fe_more_extra) is False + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is True + assert fe_partial.urls[-1].url == "http://someuni.edu/repo/file.pdf" + assert fm.merge_file_metadata_from(fe_partial, fe_another_url) is False -- cgit v1.2.3