diff options
-rw-r--r-- | python/sandcrawler/ia.py | 13 | ||||
-rw-r--r-- | python/sandcrawler/workers.py | 16 |
2 files changed, 26 insertions, 3 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index de5654c..14716fc 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -20,6 +20,8 @@ from gwb.loader import CDXLoaderFactory from .misc import b32_hex, requests_retry_session, gen_file_metadata +class SandcrawlerBackoffError(Exception): + pass ResourceResult = namedtuple("ResourceResult", [ "start_url", @@ -626,6 +628,9 @@ class WaybackClient: class SavePageNowError(Exception): pass +class SavePageNowBackoffError(Exception): + pass + SavePageNowResult = namedtuple('SavePageNowResult', [ 'success', 'status', @@ -696,11 +701,15 @@ class SavePageNowClient: 'if_not_archived_within': '1d', }, ) - if resp.status_code != 200: + if resp.status_code == 429: + raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) + elif resp.status_code != 200: raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)) resp_json = resp.json() - if not resp_json or 'job_id' not in resp_json: + if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: + raise SavePaperNowBackoffError(resp_json['message']) + elif not resp_json or 'job_id' not in resp_json: raise SavePageNowError( "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)) diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 1e54a28..97079ee 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,12 +1,14 @@ import sys import json +import time import zipfile import multiprocessing.pool from collections import Counter from confluent_kafka import Consumer, Producer, KafkaException from .misc import parse_cdx_line +from .ia import SandcrawlerBackoffError class SandcrawlerWorker(object): @@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) - self.worker.push_record(record) + done = False + while not done: + try: + self.worker.push_record(record) + break + except SandcrawlerBackoffError as be: + print("Backing off for 200 seconds: {}".format(be)) + self.consumer.pause() + for i in range(40): + empty_batch = self.consumer.poll(0) + assert not empty_batch + time.sleep(5) + self.consumer.resume() self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) |