diff options
| -rwxr-xr-x | python/fatcat_import.py | 74 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/ingest.py | 225 | 
3 files changed, 298 insertions, 3 deletions
| diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 6f331aaa..41a51ad4 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -164,6 +164,27 @@ def run_ingest_web(args):      else:          JsonLinePusher(iwri, args.json_file).run() +def run_ingest_fileset(args): +    ifri = IngestFilesetResultImporter(args.api, +        editgroup_description=args.editgroup_description_override, +        skip_source_allowlist=args.skip_source_allowlist, +        do_updates=args.do_updates, +        default_link_rel=args.default_link_rel, +        edit_batch_size=args.batch_size) +    if args.kafka_mode: +        KafkaJsonPusher( +            ifri, +            args.kafka_hosts, +            args.kafka_env, +            "ingest-fileset-results", +            "fatcat-{}-ingest-fileset-result".format(args.kafka_env), +            kafka_namespace="sandcrawler", +            consume_batch_size=args.batch_size, +            force_flush=True, +        ).run() +    else: +        JsonLinePusher(ifri, args.json_file).run() +  def run_savepapernow_file(args):      ifri = SavePaperNowFileImporter(args.api,          editgroup_description=args.editgroup_description_override, @@ -200,6 +221,24 @@ def run_savepapernow_web(args):      else:          JsonLinePusher(ifri, args.json_file).run() +def run_savepapernow_fileset(args): +    ifri = SavePaperNowFilesetImporter(args.api, +        editgroup_description=args.editgroup_description_override, +        edit_batch_size=args.batch_size) +    if args.kafka_mode: +        KafkaJsonPusher( +            ifri, +            args.kafka_hosts, +            args.kafka_env, +            "ingest-file-results", +            "fatcat-{}-savepapernow-fileset-result".format(args.kafka_env), +            kafka_namespace="sandcrawler", +            consume_batch_size=args.batch_size, +            force_flush=True, +        ).run() +    else: +        JsonLinePusher(ifri, args.json_file).run() +  def run_grobid_metadata(args):      fmi = GrobidMetadataImporter(args.api,          edit_batch_size=args.batch_size, @@ -569,6 +608,28 @@ def main():          default="web",          help="default URL rel for matches (eg, 'publisher', 'web')") +    sub_ingest_fileset = subparsers.add_parser('ingest-fileset-results', +        help="add/update fileset entities linked to releases based on sandcrawler ingest results") +    sub_ingest_fileset.set_defaults( +        func=run_ingest_fileset, +        auth_var="FATCAT_AUTH_WORKER_CRAWL", +    ) +    sub_ingest_fileset.add_argument('json_file', +        help="ingest_fileset JSON file to import from", +        default=sys.stdin, type=argparse.FileType('r')) +    sub_ingest_fileset.add_argument('--skip-source-allowlist', +        action='store_true', +        help="don't filter import based on request source allowlist") +    sub_ingest_fileset.add_argument('--kafka-mode', +        action='store_true', +        help="consume from kafka topic (not stdin)") +    sub_ingest_fileset.add_argument('--do-updates', +        action='store_true', +        help="update pre-existing fileset entities if new match (instead of skipping)") +    sub_ingest_fileset.add_argument('--default-link-rel', +        default="fileset", +        help="default URL rel for matches (eg, 'publisher', 'web')") +      sub_savepapernow_file = subparsers.add_parser('savepapernow-file-results',          help="add file entities crawled due to async Save Paper Now request")      sub_savepapernow_file.set_defaults( @@ -595,6 +656,19 @@ def main():          action='store_true',          help="consume from kafka topic (not stdin)") +    sub_savepapernow_fileset = subparsers.add_parser('savepapernow-fileset-results', +        help="add fileset entities crawled due to async Save Paper Now request") +    sub_savepapernow_fileset.set_defaults( +        func=run_savepapernow_fileset, +        auth_var="FATCAT_AUTH_WORKER_SAVEPAPERNOW", +    ) +    sub_savepapernow_fileset.add_argument('json_file', +        help="ingest-file JSON file to import from", +        default=sys.stdin, type=argparse.FileType('r')) +    sub_savepapernow_fileset.add_argument('--kafka-mode', +        action='store_true', +        help="consume from kafka topic (not stdin)") +      sub_grobid_metadata = subparsers.add_parser('grobid-metadata',          help="create release and file entities based on GROBID PDF metadata extraction")      sub_grobid_metadata.set_defaults( diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 5da669e1..a2224081 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,7 +27,7 @@ from .orcid import OrcidImporter  from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE  from .wayback_static import auto_wayback_static  from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter, IngestFilesetResultImporter, SavePaperNowFilesetImporter  from .shadow import ShadowLibraryImporter  from .file_meta import FileMetaImporter  from .doaj_article import DoajArticleImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index bc759219..38639297 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -336,7 +336,7 @@ class SavePaperNowFileImporter(IngestFileResultImporter):          eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request"          eg_extra = kwargs.pop('editgroup_extra', dict()) -        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowFileImporter')          kwargs['submit_mode'] = submit_mode          kwargs['require_grobid'] = False          kwargs['do_updates'] = False @@ -533,7 +533,7 @@ class SavePaperNowWebImporter(IngestWebResultImporter):          eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled after a public 'Save Paper Now' request"          eg_extra = kwargs.pop('editgroup_extra', dict()) -        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebSavePaperNow') +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowWebImporter')          kwargs['submit_mode'] = submit_mode          kwargs['do_updates'] = False          super().__init__(api, @@ -573,3 +573,224 @@ class SavePaperNowWebImporter(IngestWebResultImporter):              return False          return True + + +class IngestFilesetResultImporter(IngestFileResultImporter): +    """ +    Variant of IngestFileResultImporter for processing, eg, dataset ingest +    results into fileset objects. +    """ + +    def __init__(self, api, **kwargs): + +        eg_desc = kwargs.pop('editgroup_description', None) or "Filesets crawled from web using sandcrawler ingest tool" +        eg_extra = kwargs.pop('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFilesetResultImporter') +        kwargs['do_updates'] = False +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) +        self.max_file_count = 300 + +    def want_fileset(self, row): + +        if not row.get('manifest') or len(row.get('manifest')) == 0: +            self.counts['skip-empty-manifest'] += 1 +            return False + +        if len(row.get('manifest')) > self.max_file_count: +            self.counts['skip-too-many-files'] += 1 +            return False + +        return True + +    def want(self, row): + +        if not self.want_ingest(row): +            return False + +        # fileset-specific filters +        if row['request'].get('ingest_type') not in ['dataset',]: +            self.counts['skip-ingest-type'] += 1 +            return False + +        if not self.want_fileset(row): +            return False + +        return True + +    def parse_fileset_urls(self, row): +        # XXX: create URLs and rel for dataset ingest +        if not row.get('strategy'): +            return [] +        if row['strategy'].startswith('archiveorg') and row.get('archiveorg_item_name'): +            return [ +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://archive.org/download/{row['archiveorg_item_name']}", +                    rel="archive", +                ) +            ] +        elif row['strategy'].startswith('web') and row.get('web_base_url'): +            return [ +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}", +                    rel="webarchive", +                ) +            ] +        elif row['strategy'] == 'web-file-bundle' and row.get('web_bundle_url'): +            return [ +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", +                    rel="webarchive", +                ) +            ] +        else: +            return [] + +    def parse_record(self, row): + +        request = row['request'] + +        # double check that want() filtered request correctly +        if request.get('ingest_type') not in ["dataset",]: +            self.counts['skip-ingest-type'] += 1 +            return None + +        # identify release by fatcat ident, or extid lookup +        release_ident = self.parse_ingest_release_ident(row) + +        if not release_ident: +            self.counts['skip-release-not-found'] += 1 +            return None + +        entity_extra = dict() +        edit_extra = self.parse_edit_extra(row) +        edit_extra['ingest_strategy'] = row['ingest_strategy'] +        if row.get('platform'): +            edit_extra['platform'] = row['platform'] +        if row.get('platform_id'): +            edit_extra['platform_id'] = row['platform_id'] + +        entity_urls = self.parse_fileset_urls(row) +        if not entity_urls: +            self.counts['skip-no-access-url'] += 1 +            return None + +        assert row['file_count'] == len(row['manifest']) +        if row['file_count'] > self.max_file_count: +            self.counts['skip-too-many-manifest-files'] += 1 +            return None + +        manifest = [] +        for ingest_file in row['manifest']: +            fsf = fatcat_openapi_client.FilesetFile( +                path=ingest_file['path'], +                size=ingest_file['size'], +                md5=ingest_file['md5'], +                sha1=ingest_file['sha1'], +                sha256=ingest_file.get('sha256'), +                extra=dict( +                    mimetype=ingest_file['mimetype'], +                ), +            ) +            if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size): +                self.counts['skip-partial-file-info'] += 1 +                return None +            if ingest_file.get('platform_url'): +                # XXX: should we include this? +                fsf.extra['original_url'] = ingest_file['platform_url'] +            if ingest_file.get('terminal_url') and ingest_file.get('terminal_dt'): +                fsf.extra['wayback_url'] = f"https://web.archive.org/web/{ingest_file['terminal_dt']}/{ingest_file['terminal_url']}" +            manifest.append(fsf) + +        fe = fatcat_openapi_client.FilesetEntity( +            manifest=manifest, +            urls=entity_urls, +            release_ids=[release_ident], +        ) + +        if entity_extra: +            fe.extra = entity_extra +        if edit_extra: +            fe.edit_extra = edit_extra +        return fe + +    def try_update(self, wc): + +        # check for existing edits-in-progress with same URL +        for other in self._entity_queue: +            # XXX: how to duplicate check? +            if other.original_url == wc.original_url: +                self.counts['skip-in-queue'] += 1 +                return False + +        # lookup sha1, or create new entity (TODO: API doesn't support this yet) +        #existing = None + +        # NOTE: in lieu of existing checks (by lookup), only allow one fileset per release +        release = self.api.get_release(wc.release_ids[0], expand="filesets") +        if release.filesets: +            # XXX: how to duplicate check filesets? +            # check if this is an existing match, or just a similar hit +            for other in release.filesets: +                if wc.original_url == other.original_url: +                    # TODO: compare very similar timestamps of same time (different formats) +                    self.counts['exists'] += 1 +                    return False +            self.counts['skip-release-has-fileset'] += 1 +            return False + +        return True + +    def insert_batch(self, batch): +        if self.submit_mode: +            eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( +                description=self.editgroup_description, +                extra=self.editgroup_extra)) +            for fe in batch: +                self.api.create_fileset(eg.editgroup_id, fe) +            self.api.update_editgroup(eg.editgroup_id, eg, submit=True) +        else: +            self.api.create_fileset_auto_batch(fatcat_openapi_client.FilesetAutoBatch( +                editgroup=fatcat_openapi_client.Editgroup( +                    description=self.editgroup_description, +                    extra=self.editgroup_extra), +                entity_list=batch)) + + +class SavePaperNowFilesetImporter(IngestFilesetResultImporter): +    """ +    Like SavePaperNowFileImporter, but for fileset/dataset ingest. +    """ + +    def __init__(self, api, submit_mode=True, **kwargs): + +        eg_desc = kwargs.pop('editgroup_description', None) or "Fileset crawled after a public 'Save Paper Now' request" +        eg_extra = kwargs.pop('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.SavePaperNowFilesetImporter') +        kwargs['submit_mode'] = submit_mode +        kwargs['do_updates'] = False +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) + +    def want(self, row): + +        source = row['request'].get('ingest_request_source') +        if not source: +            self.counts['skip-ingest_request_source'] += 1 +            return False +        if not source.startswith('savepapernow'): +            self.counts['skip-not-savepapernow'] += 1 +            return False + +        if row.get('hit') != True: +            self.counts['skip-hit'] += 1 +            return False + +        if not self.want_fileset(row): +            return False + +        return True | 
