aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-12-15 13:55:16 -0800
committerBryan Newbold <bnewbold@archive.org>2021-12-15 13:55:16 -0800
commitffdc901fa067db55fe6cfeb8d0c3807d29df092c (patch)
tree8fbca44e2bbd1af30af360f84dbbf238c1c814c6 /python/sandcrawler
parent100e5348c4203dd346993a185ba3749105e91541 (diff)
downloadsandcrawler-ffdc901fa067db55fe6cfeb8d0c3807d29df092c.tar.gz
sandcrawler-ffdc901fa067db55fe6cfeb8d0c3807d29df092c.zip
fileset ingest: more requests timeouts, sessions
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/fileset_platforms.py21
-rw-r--r--python/sandcrawler/fileset_strategies.py76
-rw-r--r--python/sandcrawler/ingest_fileset.py8
3 files changed, 68 insertions, 37 deletions
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index fbe8066..81aad87 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -2,7 +2,6 @@ import urllib.parse
from typing import Optional, Tuple
import internetarchive
-import requests
from sandcrawler.fileset_types import (
FilesetManifestFile,
@@ -13,6 +12,7 @@ from sandcrawler.fileset_types import (
)
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult
+from sandcrawler.misc import requests_retry_session
class FilesetPlatformHelper:
@@ -61,7 +61,7 @@ class DataverseHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = "dataverse"
- self.session = requests.Session()
+ self.session = requests_retry_session()
@staticmethod
def parse_dataverse_persistentid(pid: str) -> dict:
@@ -200,7 +200,8 @@ class DataverseHelper(FilesetPlatformHelper):
# 1b. if we didn't get a version number from URL, fetch it from API
if not dataset_version:
resp = self.session.get(
- f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}"
+ f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}",
+ timeout=60.0,
)
resp.raise_for_status()
obj = resp.json()
@@ -211,7 +212,8 @@ class DataverseHelper(FilesetPlatformHelper):
# 2. API fetch
resp = self.session.get(
- f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}"
+ f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}",
+ timeout=60.0,
)
resp.raise_for_status()
obj = resp.json()
@@ -350,7 +352,7 @@ class FigshareHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = "figshare"
- self.session = requests.Session()
+ self.session = requests_retry_session()
@staticmethod
def parse_figshare_url_path(path: str) -> Tuple[str, Optional[str]]:
@@ -441,7 +443,8 @@ class FigshareHelper(FilesetPlatformHelper):
# 2. API fetch
resp = self.session.get(
- f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}"
+ f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}",
+ timeout=60.0,
)
resp.raise_for_status()
obj = resp.json()
@@ -537,7 +540,7 @@ class ZenodoHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = "zenodo"
- self.session = requests.Session()
+ self.session = requests_retry_session()
def match_request(
self,
@@ -589,7 +592,7 @@ class ZenodoHelper(FilesetPlatformHelper):
raise PlatformScopeError(f"unexpected zenodo.org domain: {platform_domain}")
# 2. API fetch
- resp = self.session.get(f"https://zenodo.org/api/records/{platform_id}")
+ resp = self.session.get(f"https://zenodo.org/api/records/{platform_id}", timeout=60.0)
if resp.status_code == 410:
raise PlatformRestrictedError("record deleted")
resp.raise_for_status()
@@ -764,7 +767,7 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
)
# print(f" archiveorg processing item={item_name}", file=sys.stderr)
- item = self.session.get_item(item_name)
+ item = self.session.get_item(item_name, timeout=60.0)
item_name = item.identifier
item_collection = item.metadata["collection"]
if type(item_collection) == list:
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index b0131f4..f83d1ce 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -4,6 +4,7 @@ import sys
from typing import Optional
import internetarchive
+import requests
from sandcrawler.fileset_types import (
ArchiveStrategyResult,
@@ -12,7 +13,12 @@ from sandcrawler.fileset_types import (
PlatformScopeError,
)
from sandcrawler.ia import SavePageNowClient, WaybackClient, fix_transfer_encoding
-from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path
+from sandcrawler.misc import (
+ gen_file_metadata,
+ gen_file_metadata_path,
+ requests_retry_session,
+ sanitize_fs_path,
+)
class FilesetIngestStrategy:
@@ -40,12 +46,15 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
except FileExistsError:
pass
- self.ia_session = internetarchive.get_session(config={
- 's3': {
- 'access': os.environ.get("IA_ACCESS_KEY"),
- 'secret': os.environ.get("IA_SECRET_KEY"),
- },
- })
+ self.http_session = requests_retry_session()
+ self.ia_session = internetarchive.get_session(
+ config={
+ "s3": {
+ "access": os.environ.get("IA_ACCESS_KEY"),
+ "secret": os.environ.get("IA_SECRET_KEY"),
+ },
+ }
+ )
def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
"""
@@ -119,22 +128,28 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
if not os.path.exists(os.path.dirname(local_path)):
os.mkdir(os.path.dirname(local_path))
- if not os.path.exists(local_path):
+ if os.path.exists(local_path):
+ m.status = "exists-local"
+ else:
print(f" downloading {m.path}", file=sys.stderr)
# create any sub-directories for this path, if necessary
if not os.path.exists(os.path.dirname(local_path)):
os.mkdir(os.path.dirname(local_path))
- with self.ia_session.get(
- m.platform_url, stream=True, allow_redirects=True
- ) as r:
- r.raise_for_status()
- with open(local_path + ".partial", "wb") as f:
- for chunk in r.iter_content(chunk_size=256 * 1024):
- f.write(chunk)
- os.rename(local_path + ".partial", local_path)
- m.status = "downloaded-local"
- else:
- m.status = "exists-local"
+ try:
+ with self.http_session.get(
+ m.platform_url,
+ stream=True,
+ allow_redirects=True,
+ timeout=2 * 60 * 60,
+ ) as r:
+ r.raise_for_status()
+ with open(local_path + ".partial", "wb") as f:
+ for chunk in r.iter_content(chunk_size=256 * 1024):
+ f.write(chunk)
+ os.rename(local_path + ".partial", local_path)
+ m.status = "downloaded-local"
+ except requests.exceptions.RequestException:
+ m.status = "error-platform-download"
print(f" verifying {m.path}", file=sys.stderr)
file_meta = gen_file_metadata_path(local_path, allow_empty=True)
@@ -190,14 +205,21 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...",
file=sys.stderr,
)
- internetarchive.upload(
- item.archiveorg_item_name,
- files=item_files,
- metadata=item.archiveorg_item_meta,
- checksum=True,
- queue_derive=False,
- verify=True,
- )
+ try:
+ internetarchive.upload(
+ item.archiveorg_item_name,
+ files=item_files,
+ metadata=item.archiveorg_item_meta,
+ checksum=True,
+ queue_derive=False,
+ verify=True,
+ )
+ except requests.exceptions.RequestException:
+ return ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ manifest=item.manifest,
+ status="error-archiveorg-upload",
+ )
for m in item.manifest:
m.status = "success"
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 732a6ab..542dfbc 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -325,12 +325,18 @@ class IngestFilesetWorker(IngestFileWorker):
result["error_message"] = str(e)[:1600]
return result
except requests.exceptions.HTTPError as e:
+ result["error_message"] = str(e)[:1600]
if e.response.status_code == 404:
result["status"] = "platform-404"
result["error_message"] = str(e)[:1600]
return result
else:
- raise e
+ result["status"] = "platform-http-error"
+ return result
+ except requests.exceptions.RequestException as e:
+ result["error_message"] = str(e)[:1600]
+ result["status"] = "platform-error"
+ return result
# print(dataset_meta, file=sys.stderr)
platform = dataset_meta.platform_name