diff options
-rw-r--r-- | python/sandcrawler/ia.py | 16 | ||||
-rw-r--r-- | python/sandcrawler/workers.py | 11 |
2 files changed, 20 insertions, 7 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 14716fc..3be1a79 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -21,6 +21,12 @@ from gwb.loader import CDXLoaderFactory from .misc import b32_hex, requests_retry_session, gen_file_metadata class SandcrawlerBackoffError(Exception): + """ + A set of Exceptions which are raised through multiple abstraction layers to + indicate backpressure. For example, SPNv2 back-pressure sometimes needs to + be passed up through any timeout/retry code and become an actual long pause + or crash. + """ pass ResourceResult = namedtuple("ResourceResult", [ @@ -628,7 +634,7 @@ class WaybackClient: class SavePageNowError(Exception): pass -class SavePageNowBackoffError(Exception): +class SavePageNowBackoffError(SandcrawlerBackoffError): pass SavePageNowResult = namedtuple('SavePageNowResult', [ @@ -702,13 +708,13 @@ class SavePageNowClient: }, ) if resp.status_code == 429: - raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) + raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) elif resp.status_code != 200: raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)) resp_json = resp.json() if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: - raise SavePaperNowBackoffError(resp_json['message']) + raise SavePageNowBackoffError(resp_json['message']) elif not resp_json or 'job_id' not in resp_json: raise SavePageNowError( "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)) @@ -785,13 +791,13 @@ class SavePageNowClient: elif status == "error:no-access": status = "forbidden" elif status == "error:user-session-limit": - raise Exception("SPNv2 user-session-limit, need to backoff") + raise SavePageNowBackoffError("SPNv2 user-session-limit") elif status.startswith("error:"): status = "spn2-" + status return ResourceResult( start_url=start_url, hit=False, - status=spn_result.status, + status=status, terminal_url=spn_result.terminal_url, terminal_dt=spn_result.terminal_dt, terminal_status_code=None, diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 97079ee..c290421 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) + # This complex bit of code implements backoff/backpressure + # in a way that will not cause this Kafka consumer to lose + # partition assignments (resulting in a rebalance). This + # was needed for the ingest workers. There is probably a + # better way to structure this concurrency. done = False while not done: try: @@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher): break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) - self.consumer.pause() + self.consumer.pause(self.consumer.assignment()) for i in range(40): + # Beware this poll which should not be + # receiving any messages because we are paused! empty_batch = self.consumer.poll(0) assert not empty_batch time.sleep(5) - self.consumer.resume() + self.consumer.resume(self.consumer.assignment()) self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) |