From 76db7f4048116a23c82bdd70bb11dd004e347e8e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 8 Oct 2019 15:56:53 -0700 Subject: new cleanup python tool/framework --- python/fatcat_cleanup.py | 59 +++++++++++++ python/fatcat_tools/cleanups/NOTES.txt | 24 ++++++ python/fatcat_tools/cleanups/__init__.py | 3 + python/fatcat_tools/cleanups/common.py | 140 +++++++++++++++++++++++++++++++ python/fatcat_tools/cleanups/files.py | 74 ++++++++++++++++ 5 files changed, 300 insertions(+) create mode 100755 python/fatcat_cleanup.py create mode 100644 python/fatcat_tools/cleanups/NOTES.txt create mode 100644 python/fatcat_tools/cleanups/__init__.py create mode 100644 python/fatcat_tools/cleanups/common.py create mode 100644 python/fatcat_tools/cleanups/files.py (limited to 'python') diff --git a/python/fatcat_cleanup.py b/python/fatcat_cleanup.py new file mode 100755 index 00000000..42887299 --- /dev/null +++ b/python/fatcat_cleanup.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 + +import os, sys, argparse +from fatcat_tools import authenticated_api +from fatcat_tools.importers import JsonLinePusher +from fatcat_tools.cleanups import * + + +def run_files(args): + fmi = FileCleaner(args.api, + dry_run_mode=args.dry_run, + edit_batch_size=args.batch_size, + editgroup_description=args.editgroup_description_override) + JsonLinePusher(fmi, args.json_file).run() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--batch-size', + help="size of batch to send", + default=50, type=int) + parser.add_argument('--editgroup-description-override', + help="editgroup description override", + default=None, type=str) + parser.add_argument('--dry-run', + help="dry-run mode (don't actually update)", + default=False, type=bool) + subparsers = parser.add_subparsers() + + sub_files = subparsers.add_parser('files') + sub_files.set_defaults( + func=run_files, + auth_var="FATCAT_AUTH_WORKER_CLEANUP", + ) + sub_files.add_argument('json_file', + help="files JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + # allow editgroup description override via env variable (but CLI arg takes + # precedence) + if not args.editgroup_description_override \ + and os.environ.get('FATCAT_EDITGROUP_DESCRIPTION'): + args.editgroup_description_override = os.environ.get('FATCAT_EDITGROUP_DESCRIPTION') + + args.api = authenticated_api( + args.host_url, + # token is an optional kwarg (can be empty string, None, etc) + token=os.environ.get(args.auth_var)) + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/fatcat_tools/cleanups/NOTES.txt b/python/fatcat_tools/cleanups/NOTES.txt new file mode 100644 index 00000000..cdaed6b1 --- /dev/null +++ b/python/fatcat_tools/cleanups/NOTES.txt @@ -0,0 +1,24 @@ + +design is to iterate over JSON list of full entities. perform transforms/fixes. +if no changes, bail early. if changes, do a request to check that current rev +of entity is same as processed, to prevent race conditions; if a match, do +update (in import/merge batch style). + +should pre-filter entities piped in. also have a CLI mode to do a single +entity; check+update code should be distinct from fix code. + +releases +- extra.subtitle => subtitle +- has pmid, type is journal-article, title like "Retraction:" => type is retraction +- similar to above, title like "Retracted:" => status is retracted +- longtail release year is bogus (like > 2030?) => remove release year + +files +- URL has ://archive.org/ link with rel=repository => rel=archive +- URL has ://web.archive.org/web/None/ link => delete URL +- URL has short wayback date ("2017") and another url with that as prefix => delete URL +- mimetype is bogus like (???) => clean mimetype + +container +- extra.issnp = "NA" => delete key + => in general, issne or issnp not valid ISSNs -> delete key diff --git a/python/fatcat_tools/cleanups/__init__.py b/python/fatcat_tools/cleanups/__init__.py new file mode 100644 index 00000000..587c7b9b --- /dev/null +++ b/python/fatcat_tools/cleanups/__init__.py @@ -0,0 +1,3 @@ + +from .common import EntityCleaner +from .files import FileCleaner diff --git a/python/fatcat_tools/cleanups/common.py b/python/fatcat_tools/cleanups/common.py new file mode 100644 index 00000000..ad2ff858 --- /dev/null +++ b/python/fatcat_tools/cleanups/common.py @@ -0,0 +1,140 @@ + +import json +import copy +import subprocess +from collections import Counter + +from fatcat_openapi_client import ApiClient, Editgroup +from fatcat_openapi_client.rest import ApiException +from fatcat_tools.transforms import entity_from_dict, entity_to_dict + + +class EntityCleaner: + """ + API for individual jobs: + + # record iterators sees + push_record(record) + finish() + + # provided helpers + self.api + self.get_editgroup_id() + counts({'lines', 'skip', 'merged', 'updated'}) + + # implemented per-task + try_merge(idents, primary=None) -> int (entities updated) + + This class is pretty similar to EntityImporter, but isn't subclassed. + """ + + def __init__(self, api, entity_type, **kwargs): + + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['git_rev'] = eg_extra.get('git_rev', + subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityCleaner') + + self.api = api + self.entity_type = entity_type + self.dry_run_mode = kwargs.get('dry_run_mode', True) + self.edit_batch_size = kwargs.get('edit_batch_size', 50) + self.editgroup_description = kwargs.get('editgroup_description') + self.editgroup_extra = eg_extra + self.reset() + self.ac = ApiClient() + + if self.dry_run_mode: + print("Running in dry-run mode!") + + def reset(self): + self.counts = Counter({'lines': 0, 'cleaned': 0, 'updated': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + self._idents_inflight = [] + + def push_record(self, record): + """ + Intended to be called by "pusher" class (which could be pulling from + JSON file, Kafka, whatever). + + Input is expected to be an entity in JSON-like dict form. + + Returns nothing. + """ + self.counts['lines'] += 1 + if (not record): + self.counts['skip-null'] += 1 + return + + entity = entity_from_dict(record, self.entity_type, api_client=self.ac) + + if entity.state != 'active': + self.counts['skip-inactive'] += 1 + return + + cleaned = self.clean_entity(copy.deepcopy(entity)) + if entity == cleaned: + self.counts['skip-clean'] += 1 + return + else: + self.counts['cleaned'] += 1 + + if self.dry_run_mode: + entity_dict = entity_to_dict(entity, api_client=self.ac) + print(json.dumps(entity_dict)) + return + + if entity.ident in self._idents_inflight: + raise ValueError("Entity already part of in-process update: {}".format(entity.ident)) + + updated = self.try_update(cleaned) + if updated: + self.counts['updated'] += updated + self._edit_count += updated + self._idents.inflight.append(entity.ident) + + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + return + + def clean_entity(self, entity): + """ + Mutates entity in-place and returns it + """ + # implementations should fill this in + raise NotImplementedError + + def try_update(self, entity): + """ + Returns edit count (number of entities updated). + + If >= 1, does not need to update self.counts. If no entities updated, + do need to update counts internally. + """ + # implementations should fill this in + raise NotImplementedError + + def finish(self): + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + self._idents_inflight = [] + + return self.counts + + def get_editgroup_id(self): + + if not self._editgroup_id: + eg = self.api.create_editgroup( + Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + self._editgroup_id = eg.editgroup_id + + return self._editgroup_id diff --git a/python/fatcat_tools/cleanups/files.py b/python/fatcat_tools/cleanups/files.py new file mode 100644 index 00000000..c2733ba0 --- /dev/null +++ b/python/fatcat_tools/cleanups/files.py @@ -0,0 +1,74 @@ + +from fatcat_openapi_client.rest import ApiException +from fatcat_openapi_client.models import FileEntity +from fatcat_tools.transforms import entity_to_dict, entity_from_json + +from .common import EntityCleaner + + +class FileCleaner(EntityCleaner): + """ + File fixups! + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', + "Automated cleanup of file entities (eg, remove bad URLs)") + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.FileCleaner') + super().__init__(api, + entity_type=FileEntity, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def clean_entity(self, entity): + """ + TODO: mimetype is bogus like (???) => clean mimetype + """ + + # URL has ://web.archive.org/web/None/ link => delete URL + entity.urls = [u for u in entity.urls if not '://web.archive.org/web/None/' in u.url] + + # URL has ://archive.org/ link with rel=repository => rel=archive + for u in entity.urls: + if '://archive.org/' in u.url and u.rel == 'repository': + u.rel = 'archive' + + # URL has short wayback date ("2017") and another url with that as prefix => delete URL + stub_wayback_urls = [] + full_wayback_urls = [] + for u in entity.urls: + if '://web.archive.org/web/' in u.url: + if len(u.url.split('/')[4]) <= 8: + stub_wayback_urls.append(u.url) + else: + full_wayback_urls.append('/'.join(u.url.split('/')[5:])) + for stub in stub_wayback_urls: + target = '/'.join(stub.split('/')[5:]) + if target in full_wayback_urls: + entity.urls = [u for u in entity.urls if u.url != stub] + + return entity + + def try_update(self, entity): + + try: + existing = self.api.get_file(entity.ident) + except ApiException as err: + if err.status != 404: + raise err + self.counts['skip-not-found'] += 1 + return 0 + + if existing.state != 'active': + self.counts['skip-existing-inactive'] += 1 + return 0 + if existing.revision != entity.revision: + self.counts['skip-revision'] += 1 + return 0 + + self.update_file(self.get_editgroup_id(), entity.ident, entity) + return 1 + -- cgit v1.2.3