From 75baf7d423a2cb119bd485672a00fd664e32537c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 11 Oct 2021 16:51:06 -0700 Subject: initial implementation of fileset ingest importers --- python/fatcat_tools/importers/__init__.py | 2 +- python/fatcat_tools/importers/ingest.py | 225 +++++++++++++++++++++++++++++- 2 files changed, 224 insertions(+), 3 deletions(-) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 5da669e1..a2224081 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,7 +27,7 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter, IngestFilesetResultImporter, SavePaperNowFilesetImporter from .shadow import ShadowLibraryImporter from .file_meta import FileMetaImporter from .doaj_article import DoajArticleImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index bc759219..38639297 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -336,7 +336,7 @@ class SavePaperNowFileImporter(IngestFileResultImporter): eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request" eg_extra = kwargs.pop('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowFileImporter') kwargs['submit_mode'] = submit_mode kwargs['require_grobid'] = False kwargs['do_updates'] = False @@ -533,7 +533,7 @@ class SavePaperNowWebImporter(IngestWebResultImporter): eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled after a public 'Save Paper Now' request" eg_extra = kwargs.pop('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebSavePaperNow') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowWebImporter') kwargs['submit_mode'] = submit_mode kwargs['do_updates'] = False super().__init__(api, @@ -573,3 +573,224 @@ class SavePaperNowWebImporter(IngestWebResultImporter): return False return True + + +class IngestFilesetResultImporter(IngestFileResultImporter): + """ + Variant of IngestFileResultImporter for processing, eg, dataset ingest + results into fileset objects. + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Filesets crawled from web using sandcrawler ingest tool" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFilesetResultImporter') + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + self.max_file_count = 300 + + def want_fileset(self, row): + + if not row.get('manifest') or len(row.get('manifest')) == 0: + self.counts['skip-empty-manifest'] += 1 + return False + + if len(row.get('manifest')) > self.max_file_count: + self.counts['skip-too-many-files'] += 1 + return False + + return True + + def want(self, row): + + if not self.want_ingest(row): + return False + + # fileset-specific filters + if row['request'].get('ingest_type') not in ['dataset',]: + self.counts['skip-ingest-type'] += 1 + return False + + if not self.want_fileset(row): + return False + + return True + + def parse_fileset_urls(self, row): + # XXX: create URLs and rel for dataset ingest + if not row.get('strategy'): + return [] + if row['strategy'].startswith('archiveorg') and row.get('archiveorg_item_name'): + return [ + fatcat_openapi_client.FilesetUrl( + url=f"https://archive.org/download/{row['archiveorg_item_name']}", + rel="archive", + ) + ] + elif row['strategy'].startswith('web') and row.get('web_base_url'): + return [ + fatcat_openapi_client.FilesetUrl( + url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}", + rel="webarchive", + ) + ] + elif row['strategy'] == 'web-file-bundle' and row.get('web_bundle_url'): + return [ + fatcat_openapi_client.FilesetUrl( + url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", + rel="webarchive", + ) + ] + else: + return [] + + def parse_record(self, row): + + request = row['request'] + + # double check that want() filtered request correctly + if request.get('ingest_type') not in ["dataset",]: + self.counts['skip-ingest-type'] += 1 + return None + + # identify release by fatcat ident, or extid lookup + release_ident = self.parse_ingest_release_ident(row) + + if not release_ident: + self.counts['skip-release-not-found'] += 1 + return None + + entity_extra = dict() + edit_extra = self.parse_edit_extra(row) + edit_extra['ingest_strategy'] = row['ingest_strategy'] + if row.get('platform'): + edit_extra['platform'] = row['platform'] + if row.get('platform_id'): + edit_extra['platform_id'] = row['platform_id'] + + entity_urls = self.parse_fileset_urls(row) + if not entity_urls: + self.counts['skip-no-access-url'] += 1 + return None + + assert row['file_count'] == len(row['manifest']) + if row['file_count'] > self.max_file_count: + self.counts['skip-too-many-manifest-files'] += 1 + return None + + manifest = [] + for ingest_file in row['manifest']: + fsf = fatcat_openapi_client.FilesetFile( + path=ingest_file['path'], + size=ingest_file['size'], + md5=ingest_file['md5'], + sha1=ingest_file['sha1'], + sha256=ingest_file.get('sha256'), + extra=dict( + mimetype=ingest_file['mimetype'], + ), + ) + if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size): + self.counts['skip-partial-file-info'] += 1 + return None + if ingest_file.get('platform_url'): + # XXX: should we include this? + fsf.extra['original_url'] = ingest_file['platform_url'] + if ingest_file.get('terminal_url') and ingest_file.get('terminal_dt'): + fsf.extra['wayback_url'] = f"https://web.archive.org/web/{ingest_file['terminal_dt']}/{ingest_file['terminal_url']}" + manifest.append(fsf) + + fe = fatcat_openapi_client.FilesetEntity( + manifest=manifest, + urls=entity_urls, + release_ids=[release_ident], + ) + + if entity_extra: + fe.extra = entity_extra + if edit_extra: + fe.edit_extra = edit_extra + return fe + + def try_update(self, wc): + + # check for existing edits-in-progress with same URL + for other in self._entity_queue: + # XXX: how to duplicate check? + if other.original_url == wc.original_url: + self.counts['skip-in-queue'] += 1 + return False + + # lookup sha1, or create new entity (TODO: API doesn't support this yet) + #existing = None + + # NOTE: in lieu of existing checks (by lookup), only allow one fileset per release + release = self.api.get_release(wc.release_ids[0], expand="filesets") + if release.filesets: + # XXX: how to duplicate check filesets? + # check if this is an existing match, or just a similar hit + for other in release.filesets: + if wc.original_url == other.original_url: + # TODO: compare very similar timestamps of same time (different formats) + self.counts['exists'] += 1 + return False + self.counts['skip-release-has-fileset'] += 1 + return False + + return True + + def insert_batch(self, batch): + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_fileset(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_fileset_auto_batch(fatcat_openapi_client.FilesetAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + + +class SavePaperNowFilesetImporter(IngestFilesetResultImporter): + """ + Like SavePaperNowFileImporter, but for fileset/dataset ingest. + """ + + def __init__(self, api, submit_mode=True, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Fileset crawled after a public 'Save Paper Now' request" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowFilesetImporter') + kwargs['submit_mode'] = submit_mode + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + + source = row['request'].get('ingest_request_source') + if not source: + self.counts['skip-ingest_request_source'] += 1 + return False + if not source.startswith('savepapernow'): + self.counts['skip-not-savepapernow'] += 1 + return False + + if row.get('hit') != True: + self.counts['skip-hit'] += 1 + return False + + if not self.want_fileset(row): + return False + + return True -- cgit v1.2.3