From ffdc901fa067db55fe6cfeb8d0c3807d29df092c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 15 Dec 2021 13:55:16 -0800 Subject: fileset ingest: more requests timeouts, sessions --- python/sandcrawler/fileset_platforms.py | 21 +++++---- python/sandcrawler/fileset_strategies.py | 76 ++++++++++++++++++++------------ python/sandcrawler/ingest_fileset.py | 8 +++- 3 files changed, 68 insertions(+), 37 deletions(-) diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index fbe8066..81aad87 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -2,7 +2,6 @@ import urllib.parse from typing import Optional, Tuple import internetarchive -import requests from sandcrawler.fileset_types import ( FilesetManifestFile, @@ -13,6 +12,7 @@ from sandcrawler.fileset_types import ( ) from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult +from sandcrawler.misc import requests_retry_session class FilesetPlatformHelper: @@ -61,7 +61,7 @@ class DataverseHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = "dataverse" - self.session = requests.Session() + self.session = requests_retry_session() @staticmethod def parse_dataverse_persistentid(pid: str) -> dict: @@ -200,7 +200,8 @@ class DataverseHelper(FilesetPlatformHelper): # 1b. if we didn't get a version number from URL, fetch it from API if not dataset_version: resp = self.session.get( - f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}" + f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}", + timeout=60.0, ) resp.raise_for_status() obj = resp.json() @@ -211,7 +212,8 @@ class DataverseHelper(FilesetPlatformHelper): # 2. API fetch resp = self.session.get( - f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}" + f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}", + timeout=60.0, ) resp.raise_for_status() obj = resp.json() @@ -350,7 +352,7 @@ class FigshareHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = "figshare" - self.session = requests.Session() + self.session = requests_retry_session() @staticmethod def parse_figshare_url_path(path: str) -> Tuple[str, Optional[str]]: @@ -441,7 +443,8 @@ class FigshareHelper(FilesetPlatformHelper): # 2. API fetch resp = self.session.get( - f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}" + f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}", + timeout=60.0, ) resp.raise_for_status() obj = resp.json() @@ -537,7 +540,7 @@ class ZenodoHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = "zenodo" - self.session = requests.Session() + self.session = requests_retry_session() def match_request( self, @@ -589,7 +592,7 @@ class ZenodoHelper(FilesetPlatformHelper): raise PlatformScopeError(f"unexpected zenodo.org domain: {platform_domain}") # 2. API fetch - resp = self.session.get(f"https://zenodo.org/api/records/{platform_id}") + resp = self.session.get(f"https://zenodo.org/api/records/{platform_id}", timeout=60.0) if resp.status_code == 410: raise PlatformRestrictedError("record deleted") resp.raise_for_status() @@ -764,7 +767,7 @@ class ArchiveOrgHelper(FilesetPlatformHelper): ) # print(f" archiveorg processing item={item_name}", file=sys.stderr) - item = self.session.get_item(item_name) + item = self.session.get_item(item_name, timeout=60.0) item_name = item.identifier item_collection = item.metadata["collection"] if type(item_collection) == list: diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index b0131f4..f83d1ce 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -4,6 +4,7 @@ import sys from typing import Optional import internetarchive +import requests from sandcrawler.fileset_types import ( ArchiveStrategyResult, @@ -12,7 +13,12 @@ from sandcrawler.fileset_types import ( PlatformScopeError, ) from sandcrawler.ia import SavePageNowClient, WaybackClient, fix_transfer_encoding -from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path +from sandcrawler.misc import ( + gen_file_metadata, + gen_file_metadata_path, + requests_retry_session, + sanitize_fs_path, +) class FilesetIngestStrategy: @@ -40,12 +46,15 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): except FileExistsError: pass - self.ia_session = internetarchive.get_session(config={ - 's3': { - 'access': os.environ.get("IA_ACCESS_KEY"), - 'secret': os.environ.get("IA_SECRET_KEY"), - }, - }) + self.http_session = requests_retry_session() + self.ia_session = internetarchive.get_session( + config={ + "s3": { + "access": os.environ.get("IA_ACCESS_KEY"), + "secret": os.environ.get("IA_SECRET_KEY"), + }, + } + ) def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: """ @@ -119,22 +128,28 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): if not os.path.exists(os.path.dirname(local_path)): os.mkdir(os.path.dirname(local_path)) - if not os.path.exists(local_path): + if os.path.exists(local_path): + m.status = "exists-local" + else: print(f" downloading {m.path}", file=sys.stderr) # create any sub-directories for this path, if necessary if not os.path.exists(os.path.dirname(local_path)): os.mkdir(os.path.dirname(local_path)) - with self.ia_session.get( - m.platform_url, stream=True, allow_redirects=True - ) as r: - r.raise_for_status() - with open(local_path + ".partial", "wb") as f: - for chunk in r.iter_content(chunk_size=256 * 1024): - f.write(chunk) - os.rename(local_path + ".partial", local_path) - m.status = "downloaded-local" - else: - m.status = "exists-local" + try: + with self.http_session.get( + m.platform_url, + stream=True, + allow_redirects=True, + timeout=2 * 60 * 60, + ) as r: + r.raise_for_status() + with open(local_path + ".partial", "wb") as f: + for chunk in r.iter_content(chunk_size=256 * 1024): + f.write(chunk) + os.rename(local_path + ".partial", local_path) + m.status = "downloaded-local" + except requests.exceptions.RequestException: + m.status = "error-platform-download" print(f" verifying {m.path}", file=sys.stderr) file_meta = gen_file_metadata_path(local_path, allow_empty=True) @@ -190,14 +205,21 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr, ) - internetarchive.upload( - item.archiveorg_item_name, - files=item_files, - metadata=item.archiveorg_item_meta, - checksum=True, - queue_derive=False, - verify=True, - ) + try: + internetarchive.upload( + item.archiveorg_item_name, + files=item_files, + metadata=item.archiveorg_item_meta, + checksum=True, + queue_derive=False, + verify=True, + ) + except requests.exceptions.RequestException: + return ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + manifest=item.manifest, + status="error-archiveorg-upload", + ) for m in item.manifest: m.status = "success" diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 732a6ab..542dfbc 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -325,12 +325,18 @@ class IngestFilesetWorker(IngestFileWorker): result["error_message"] = str(e)[:1600] return result except requests.exceptions.HTTPError as e: + result["error_message"] = str(e)[:1600] if e.response.status_code == 404: result["status"] = "platform-404" result["error_message"] = str(e)[:1600] return result else: - raise e + result["status"] = "platform-http-error" + return result + except requests.exceptions.RequestException as e: + result["error_message"] = str(e)[:1600] + result["status"] = "platform-error" + return result # print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name -- cgit v1.2.3