diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-10-11 16:51:06 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-10-14 18:11:12 -0700 |
commit | 75baf7d423a2cb119bd485672a00fd664e32537c (patch) | |
tree | 8655162548fb4c8befc580ef006b38bda4899167 | |
parent | c0c9d4da83b027b081eab364bfc7b807dbe9a2e5 (diff) | |
download | fatcat-75baf7d423a2cb119bd485672a00fd664e32537c.tar.gz fatcat-75baf7d423a2cb119bd485672a00fd664e32537c.zip |
initial implementation of fileset ingest importers
-rwxr-xr-x | python/fatcat_import.py | 74 | ||||
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 225 |
3 files changed, 298 insertions, 3 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 6f331aaa..41a51ad4 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -164,6 +164,27 @@ def run_ingest_web(args): else: JsonLinePusher(iwri, args.json_file).run() +def run_ingest_fileset(args): + ifri = IngestFilesetResultImporter(args.api, + editgroup_description=args.editgroup_description_override, + skip_source_allowlist=args.skip_source_allowlist, + do_updates=args.do_updates, + default_link_rel=args.default_link_rel, + edit_batch_size=args.batch_size) + if args.kafka_mode: + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-fileset-results", + "fatcat-{}-ingest-fileset-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + force_flush=True, + ).run() + else: + JsonLinePusher(ifri, args.json_file).run() + def run_savepapernow_file(args): ifri = SavePaperNowFileImporter(args.api, editgroup_description=args.editgroup_description_override, @@ -200,6 +221,24 @@ def run_savepapernow_web(args): else: JsonLinePusher(ifri, args.json_file).run() +def run_savepapernow_fileset(args): + ifri = SavePaperNowFilesetImporter(args.api, + editgroup_description=args.editgroup_description_override, + edit_batch_size=args.batch_size) + if args.kafka_mode: + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-savepapernow-fileset-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + force_flush=True, + ).run() + else: + JsonLinePusher(ifri, args.json_file).run() + def run_grobid_metadata(args): fmi = GrobidMetadataImporter(args.api, edit_batch_size=args.batch_size, @@ -569,6 +608,28 @@ def main(): default="web", help="default URL rel for matches (eg, 'publisher', 'web')") + sub_ingest_fileset = subparsers.add_parser('ingest-fileset-results', + help="add/update fileset entities linked to releases based on sandcrawler ingest results") + sub_ingest_fileset.set_defaults( + func=run_ingest_fileset, + auth_var="FATCAT_AUTH_WORKER_CRAWL", + ) + sub_ingest_fileset.add_argument('json_file', + help="ingest_fileset JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + sub_ingest_fileset.add_argument('--skip-source-allowlist', + action='store_true', + help="don't filter import based on request source allowlist") + sub_ingest_fileset.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") + sub_ingest_fileset.add_argument('--do-updates', + action='store_true', + help="update pre-existing fileset entities if new match (instead of skipping)") + sub_ingest_fileset.add_argument('--default-link-rel', + default="fileset", + help="default URL rel for matches (eg, 'publisher', 'web')") + sub_savepapernow_file = subparsers.add_parser('savepapernow-file-results', help="add file entities crawled due to async Save Paper Now request") sub_savepapernow_file.set_defaults( @@ -595,6 +656,19 @@ def main(): action='store_true', help="consume from kafka topic (not stdin)") + sub_savepapernow_fileset = subparsers.add_parser('savepapernow-fileset-results', + help="add fileset entities crawled due to async Save Paper Now request") + sub_savepapernow_fileset.set_defaults( + func=run_savepapernow_fileset, + auth_var="FATCAT_AUTH_WORKER_SAVEPAPERNOW", + ) + sub_savepapernow_fileset.add_argument('json_file', + help="ingest-file JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + sub_savepapernow_fileset.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") + sub_grobid_metadata = subparsers.add_parser('grobid-metadata', help="create release and file entities based on GROBID PDF metadata extraction") sub_grobid_metadata.set_defaults( diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 5da669e1..a2224081 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,7 +27,7 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter, IngestFilesetResultImporter, SavePaperNowFilesetImporter from .shadow import ShadowLibraryImporter from .file_meta import FileMetaImporter from .doaj_article import DoajArticleImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index bc759219..38639297 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -336,7 +336,7 @@ class SavePaperNowFileImporter(IngestFileResultImporter): eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request" eg_extra = kwargs.pop('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowFileImporter') kwargs['submit_mode'] = submit_mode kwargs['require_grobid'] = False kwargs['do_updates'] = False @@ -533,7 +533,7 @@ class SavePaperNowWebImporter(IngestWebResultImporter): eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled after a public 'Save Paper Now' request" eg_extra = kwargs.pop('editgroup_extra', dict()) - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebSavePaperNow') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowWebImporter') kwargs['submit_mode'] = submit_mode kwargs['do_updates'] = False super().__init__(api, @@ -573,3 +573,224 @@ class SavePaperNowWebImporter(IngestWebResultImporter): return False return True + + +class IngestFilesetResultImporter(IngestFileResultImporter): + """ + Variant of IngestFileResultImporter for processing, eg, dataset ingest + results into fileset objects. + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Filesets crawled from web using sandcrawler ingest tool" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFilesetResultImporter') + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + self.max_file_count = 300 + + def want_fileset(self, row): + + if not row.get('manifest') or len(row.get('manifest')) == 0: + self.counts['skip-empty-manifest'] += 1 + return False + + if len(row.get('manifest')) > self.max_file_count: + self.counts['skip-too-many-files'] += 1 + return False + + return True + + def want(self, row): + + if not self.want_ingest(row): + return False + + # fileset-specific filters + if row['request'].get('ingest_type') not in ['dataset',]: + self.counts['skip-ingest-type'] += 1 + return False + + if not self.want_fileset(row): + return False + + return True + + def parse_fileset_urls(self, row): + # XXX: create URLs and rel for dataset ingest + if not row.get('strategy'): + return [] + if row['strategy'].startswith('archiveorg') and row.get('archiveorg_item_name'): + return [ + fatcat_openapi_client.FilesetUrl( + url=f"https://archive.org/download/{row['archiveorg_item_name']}", + rel="archive", + ) + ] + elif row['strategy'].startswith('web') and row.get('web_base_url'): + return [ + fatcat_openapi_client.FilesetUrl( + url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}", + rel="webarchive", + ) + ] + elif row['strategy'] == 'web-file-bundle' and row.get('web_bundle_url'): + return [ + fatcat_openapi_client.FilesetUrl( + url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", + rel="webarchive", + ) + ] + else: + return [] + + def parse_record(self, row): + + request = row['request'] + + # double check that want() filtered request correctly + if request.get('ingest_type') not in ["dataset",]: + self.counts['skip-ingest-type'] += 1 + return None + + # identify release by fatcat ident, or extid lookup + release_ident = self.parse_ingest_release_ident(row) + + if not release_ident: + self.counts['skip-release-not-found'] += 1 + return None + + entity_extra = dict() + edit_extra = self.parse_edit_extra(row) + edit_extra['ingest_strategy'] = row['ingest_strategy'] + if row.get('platform'): + edit_extra['platform'] = row['platform'] + if row.get('platform_id'): + edit_extra['platform_id'] = row['platform_id'] + + entity_urls = self.parse_fileset_urls(row) + if not entity_urls: + self.counts['skip-no-access-url'] += 1 + return None + + assert row['file_count'] == len(row['manifest']) + if row['file_count'] > self.max_file_count: + self.counts['skip-too-many-manifest-files'] += 1 + return None + + manifest = [] + for ingest_file in row['manifest']: + fsf = fatcat_openapi_client.FilesetFile( + path=ingest_file['path'], + size=ingest_file['size'], + md5=ingest_file['md5'], + sha1=ingest_file['sha1'], + sha256=ingest_file.get('sha256'), + extra=dict( + mimetype=ingest_file['mimetype'], + ), + ) + if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size): + self.counts['skip-partial-file-info'] += 1 + return None + if ingest_file.get('platform_url'): + # XXX: should we include this? + fsf.extra['original_url'] = ingest_file['platform_url'] + if ingest_file.get('terminal_url') and ingest_file.get('terminal_dt'): + fsf.extra['wayback_url'] = f"https://web.archive.org/web/{ingest_file['terminal_dt']}/{ingest_file['terminal_url']}" + manifest.append(fsf) + + fe = fatcat_openapi_client.FilesetEntity( + manifest=manifest, + urls=entity_urls, + release_ids=[release_ident], + ) + + if entity_extra: + fe.extra = entity_extra + if edit_extra: + fe.edit_extra = edit_extra + return fe + + def try_update(self, wc): + + # check for existing edits-in-progress with same URL + for other in self._entity_queue: + # XXX: how to duplicate check? + if other.original_url == wc.original_url: + self.counts['skip-in-queue'] += 1 + return False + + # lookup sha1, or create new entity (TODO: API doesn't support this yet) + #existing = None + + # NOTE: in lieu of existing checks (by lookup), only allow one fileset per release + release = self.api.get_release(wc.release_ids[0], expand="filesets") + if release.filesets: + # XXX: how to duplicate check filesets? + # check if this is an existing match, or just a similar hit + for other in release.filesets: + if wc.original_url == other.original_url: + # TODO: compare very similar timestamps of same time (different formats) + self.counts['exists'] += 1 + return False + self.counts['skip-release-has-fileset'] += 1 + return False + + return True + + def insert_batch(self, batch): + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_fileset(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_fileset_auto_batch(fatcat_openapi_client.FilesetAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + + +class SavePaperNowFilesetImporter(IngestFilesetResultImporter): + """ + Like SavePaperNowFileImporter, but for fileset/dataset ingest. + """ + + def __init__(self, api, submit_mode=True, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Fileset crawled after a public 'Save Paper Now' request" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowFilesetImporter') + kwargs['submit_mode'] = submit_mode + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + + source = row['request'].get('ingest_request_source') + if not source: + self.counts['skip-ingest_request_source'] += 1 + return False + if not source.startswith('savepapernow'): + self.counts['skip-not-savepapernow'] += 1 + return False + + if row.get('hit') != True: + self.counts['skip-hit'] += 1 + return False + + if not self.want_fileset(row): + return False + + return True |