aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/ia.py13
-rw-r--r--python/sandcrawler/workers.py16
2 files changed, 26 insertions, 3 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index de5654c..14716fc 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -20,6 +20,8 @@ from gwb.loader import CDXLoaderFactory
from .misc import b32_hex, requests_retry_session, gen_file_metadata
+class SandcrawlerBackoffError(Exception):
+ pass
ResourceResult = namedtuple("ResourceResult", [
"start_url",
@@ -626,6 +628,9 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
+class SavePageNowBackoffError(Exception):
+ pass
+
SavePageNowResult = namedtuple('SavePageNowResult', [
'success',
'status',
@@ -696,11 +701,15 @@ class SavePageNowClient:
'if_not_archived_within': '1d',
},
)
- if resp.status_code != 200:
+ if resp.status_code == 429:
+ raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ elif resp.status_code != 200:
raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
resp_json = resp.json()
- if not resp_json or 'job_id' not in resp_json:
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ raise SavePaperNowBackoffError(resp_json['message'])
+ elif not resp_json or 'job_id' not in resp_json:
raise SavePageNowError(
"Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json))
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 1e54a28..97079ee 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,12 +1,14 @@
import sys
import json
+import time
import zipfile
import multiprocessing.pool
from collections import Counter
from confluent_kafka import Consumer, Producer, KafkaException
from .misc import parse_cdx_line
+from .ia import SandcrawlerBackoffError
class SandcrawlerWorker(object):
@@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
- self.worker.push_record(record)
+ done = False
+ while not done:
+ try:
+ self.worker.push_record(record)
+ break
+ except SandcrawlerBackoffError as be:
+ print("Backing off for 200 seconds: {}".format(be))
+ self.consumer.pause()
+ for i in range(40):
+ empty_batch = self.consumer.poll(0)
+ assert not empty_batch
+ time.sleep(5)
+ self.consumer.resume()
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)