aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-02-03 20:42:15 -0800
committerBryan Newbold <bnewbold@archive.org>2020-02-03 21:51:00 -0800
commit15c7e430ebccbdab88355c5c1f1914c3aca99c8a (patch)
tree6ce8d01b11f6e2c6792138046219c7e624aa2d0d /python
parent5f9e7fd4c89df98ed90be9629d3dc6c201b42a02 (diff)
downloadsandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.tar.gz
sandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.zip
hack-y backoff ingest attempt
The goal here is to have SPNv2 requests backoff when we get back-pressure (usually caused by some sessions taking too long). Lack of proper back-pressure is making it hard to turn up parallelism. This is a hack because we still timeout and drop the slow request. A better way is probably to have a background thread run, while the KafkaPusher thread does polling. Maybe with timeouts to detect slow processing (greater than 30 seconds?) and only pause/resume in that case. This would also make taking batches easier. Unlike the existing code, however, the parallelism needs to happen at the Pusher level to do the polling (Kafka) and "await" (for all worker threads to complete) correctly.
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/ia.py13
-rw-r--r--python/sandcrawler/workers.py16
2 files changed, 26 insertions, 3 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index de5654c..14716fc 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -20,6 +20,8 @@ from gwb.loader import CDXLoaderFactory
from .misc import b32_hex, requests_retry_session, gen_file_metadata
+class SandcrawlerBackoffError(Exception):
+ pass
ResourceResult = namedtuple("ResourceResult", [
"start_url",
@@ -626,6 +628,9 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
+class SavePageNowBackoffError(Exception):
+ pass
+
SavePageNowResult = namedtuple('SavePageNowResult', [
'success',
'status',
@@ -696,11 +701,15 @@ class SavePageNowClient:
'if_not_archived_within': '1d',
},
)
- if resp.status_code != 200:
+ if resp.status_code == 429:
+ raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ elif resp.status_code != 200:
raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
resp_json = resp.json()
- if not resp_json or 'job_id' not in resp_json:
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ raise SavePaperNowBackoffError(resp_json['message'])
+ elif not resp_json or 'job_id' not in resp_json:
raise SavePageNowError(
"Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json))
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 1e54a28..97079ee 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,12 +1,14 @@
import sys
import json
+import time
import zipfile
import multiprocessing.pool
from collections import Counter
from confluent_kafka import Consumer, Producer, KafkaException
from .misc import parse_cdx_line
+from .ia import SandcrawlerBackoffError
class SandcrawlerWorker(object):
@@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
- self.worker.push_record(record)
+ done = False
+ while not done:
+ try:
+ self.worker.push_record(record)
+ break
+ except SandcrawlerBackoffError as be:
+ print("Backing off for 200 seconds: {}".format(be))
+ self.consumer.pause()
+ for i in range(40):
+ empty_batch = self.consumer.poll(0)
+ assert not empty_batch
+ time.sleep(5)
+ self.consumer.resume()
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)