From f98f6226097ac34cf8a57ee09a4feea9171addfe Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 6 Oct 2021 18:02:41 -0700 Subject: progress on web ingest strategy --- python/sandcrawler/fileset_platforms.py | 21 ++++--- python/sandcrawler/fileset_strategies.py | 103 ++++++++++++++++++++++++++++++- python/sandcrawler/ingest_fileset.py | 9 ++- 3 files changed, 121 insertions(+), 12 deletions(-) diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 58094c2..9232870 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -36,16 +36,16 @@ class DatasetPlatformHelper(): assert item.manifest total_size = sum([m.size for m in item.manifest]) largest_size = max([m.size for m in item.manifest]) - print(f" total_size={total_size} largest_size={largest_size}", file=sys.stderr) + #print(f" total_size={total_size} largest_size={largest_size}", file=sys.stderr) # XXX: while developing ArchiveorgFileset path - return IngestStrategy.ArchiveorgFileset + #return IngestStrategy.ArchiveorgFileset if len(item.manifest) == 1: - if total_size < 128*1024*1024: + if total_size < 64*1024*1024: return IngestStrategy.WebFile else: return IngestStrategy.ArchiveorgFile else: - if largest_size < 128*1024*1024 and total_size < 1*1024*1024*1024: + if largest_size < 64*1024*1024 and total_size < 128*1024*1024*1024: return IngestStrategy.WebFileset else: return IngestStrategy.ArchiveorgFileset @@ -139,6 +139,13 @@ class DataverseHelper(DatasetPlatformHelper): platform_url = f"https://{platform_domain}/api/access/datafile/:persistentId/?persistentId={df_persistent_id}" if df.get('originalFileName'): platform_url += '&format=original' + + extra = dict() + # TODO: always save the version field? + if row.get('version') != 1: + extra['version'] = row['version'] + if 'description' in df: + extra['description'] = df['description'] manifest.append(FilesetManifestFile( path=df.get('originalFileName') or df['filename'], size=df.get('originalFileSize') or df['filesize'], @@ -146,11 +153,7 @@ class DataverseHelper(DatasetPlatformHelper): # NOTE: don't get: sha1, sha256 mimetype=df['contentType'], platform_url=platform_url, - extra=dict( - # file-level - description=df.get('description'), - version=df.get('version'), - ), + extra=extra or None, )) platform_sub_id = platform_id.split('/')[-1] diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index c335ea6..5ee4cc9 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -11,7 +11,7 @@ from typing import Optional, Tuple, Any, Dict, List import internetarchive from sandcrawler.html_metadata import BiblioMetadata -from sandcrawler.ia import ResourceResult +from sandcrawler.ia import ResourceResult, WaybackClient, SavePageNowClient, fix_transfer_encoding from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path @@ -185,8 +185,109 @@ class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset +class WebFilesetStrategy(FilesetIngestStrategy): + + def __init__(self, **kwargs): + self.ingest_strategy = IngestStrategy.WebFileset + self.wayback_client = WaybackClient() + self.try_spn2 = True + self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + + # XXX: this is copypasta + self.spn2_simple_get_domains = [ + # direct PDF links + "://arxiv.org/pdf/", + "://europepmc.org/backend/ptpmcrender.fcgi", + "://pdfs.semanticscholar.org/", + "://res.mdpi.com/", + + # platform sites + "://zenodo.org/", + "://figshare.org/", + "://springernature.figshare.com/", + + # popular simple cloud storage or direct links + "://s3-eu-west-1.amazonaws.com/", + ] + + def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + """ + For each manifest item individually, run 'fetch_resource' and record stats, terminal_url, terminal_dt + + TODO: + - full fetch_resource() method which can do SPN requests + """ + + for m in item.manifest: + fetch_url = m.platform_url + if not fetch_url: + raise NotImplementedError("require 'platform_url' for each file when doing Web fetching") + + via = "wayback" + resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype) + + + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')): + via = "spn2" + force_simple_get = 0 + for domain in self.spn2_simple_get_domains: + if domain in fetch_url: + force_simple_get = 1 + break + resource = self.spn_client.crawl_resource(fetch_url, self.wayback_client, force_simple_get=force_simple_get) + + print("[FETCH {:>6}] {} {}".format( + via, + (resource and resource.status), + (resource and resource.terminal_url) or url), + file=sys.stderr) + + m.terminal_url = resource.terminal_url + m.terminal_dt = resource.terminal_dt + m.status = resource.status + + if resource.status != 'success': + continue + else: + assert resource.terminal_status_code == 200 + + file_meta = gen_file_metadata(resource.body) + file_meta, html_resource = fix_transfer_encoding(file_meta, resource) + + if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']): + m.status = 'mismatch' + continue + + m.md5 = m.md5 or file_meta['md5hex'] + m.sha1 = m.sha1 or file_meta['md5hex'] + m.sha256 = m.sha256 or file_meta['sha256hex'] + m.mimetype = m.mimetype or file_meta['mimetype'] + + overall_status = "success" + for m in item.manifest: + if m.status != 'success': + overall_status = m.status + break + if not item.manifest: + overall_status = 'empty-manifest' + + result = ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + status=overall_status, + manifest=item.manifest, + ) + return result + +class WebFileStrategy(WebFilesetStrategy): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.ingest_strategy = IngestStrategy.WebFile + FILESET_STRATEGY_HELPER_TABLE = { IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(), IngestStrategy.ArchiveorgFile: ArchiveorgFileStrategy(), + IngestStrategy.WebFileset: WebFilesetStrategy(), + IngestStrategy.WebFile: WebFileStrategy(), } diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 3b55793..3e782ed 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -246,6 +246,8 @@ class IngestFilesetWorker(IngestFileWorker): ### END COPYPASTA ### + # XXX: html_guess_platform() + # determine platform platform_helper = None for (helper_name, helper) in self.dataset_platform_helpers.items(): @@ -279,6 +281,7 @@ class IngestFilesetWorker(IngestFileWorker): ingest_strategy = platform_helper.chose_strategy(dataset_meta) result['ingest_strategy'] = ingest_strategy + print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr) strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) if not strategy_helper: @@ -296,16 +299,18 @@ class IngestFilesetWorker(IngestFileWorker): if result['status'].startswith('success'): result['hit'] = True - print("[SUCCESS {:>5}] file_count={} total_size={}".format( + print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( ingest_type, result['file_count'], result['total_size'], + ingest_strategy, ), file=sys.stderr) else: - print("[FAIL {:>5}] status={} file_count={} total_size={}".format( + print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( ingest_type, result['status'], result['file_count'], result['total_size'], + ingest_strategy, ), file=sys.stderr) return result -- cgit v1.2.3