aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ia.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-02-03 20:42:15 -0800
committerBryan Newbold <bnewbold@archive.org>2020-02-03 21:51:00 -0800
commit15c7e430ebccbdab88355c5c1f1914c3aca99c8a (patch)
tree6ce8d01b11f6e2c6792138046219c7e624aa2d0d /python/sandcrawler/ia.py
parent5f9e7fd4c89df98ed90be9629d3dc6c201b42a02 (diff)
downloadsandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.tar.gz
sandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.zip
hack-y backoff ingest attempt
The goal here is to have SPNv2 requests backoff when we get back-pressure (usually caused by some sessions taking too long). Lack of proper back-pressure is making it hard to turn up parallelism. This is a hack because we still timeout and drop the slow request. A better way is probably to have a background thread run, while the KafkaPusher thread does polling. Maybe with timeouts to detect slow processing (greater than 30 seconds?) and only pause/resume in that case. This would also make taking batches easier. Unlike the existing code, however, the parallelism needs to happen at the Pusher level to do the polling (Kafka) and "await" (for all worker threads to complete) correctly.
Diffstat (limited to 'python/sandcrawler/ia.py')
-rw-r--r--python/sandcrawler/ia.py13
1 files changed, 11 insertions, 2 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index de5654c..14716fc 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -20,6 +20,8 @@ from gwb.loader import CDXLoaderFactory
from .misc import b32_hex, requests_retry_session, gen_file_metadata
+class SandcrawlerBackoffError(Exception):
+ pass
ResourceResult = namedtuple("ResourceResult", [
"start_url",
@@ -626,6 +628,9 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
+class SavePageNowBackoffError(Exception):
+ pass
+
SavePageNowResult = namedtuple('SavePageNowResult', [
'success',
'status',
@@ -696,11 +701,15 @@ class SavePageNowClient:
'if_not_archived_within': '1d',
},
)
- if resp.status_code != 200:
+ if resp.status_code == 429:
+ raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ elif resp.status_code != 200:
raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
resp_json = resp.json()
- if not resp_json or 'job_id' not in resp_json:
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ raise SavePaperNowBackoffError(resp_json['message'])
+ elif not resp_json or 'job_id' not in resp_json:
raise SavePageNowError(
"Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json))