diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2020-02-03 21:50:24 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2020-02-03 21:51:00 -0800 | 
| commit | c723d12dcc8c19d0302f0dfbcdd1414d834c6d73 (patch) | |
| tree | 0d346d3b3376ae7220be178d5796ea64323b34e8 | |
| parent | 15c7e430ebccbdab88355c5c1f1914c3aca99c8a (diff) | |
| download | sandcrawler-c723d12dcc8c19d0302f0dfbcdd1414d834c6d73.tar.gz sandcrawler-c723d12dcc8c19d0302f0dfbcdd1414d834c6d73.zip  | |
improvements to reliability from prod testing
| -rw-r--r-- | python/sandcrawler/ia.py | 16 | ||||
| -rw-r--r-- | python/sandcrawler/workers.py | 11 | 
2 files changed, 20 insertions, 7 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 14716fc..3be1a79 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -21,6 +21,12 @@ from gwb.loader import CDXLoaderFactory  from .misc import b32_hex, requests_retry_session, gen_file_metadata  class SandcrawlerBackoffError(Exception): +    """ +    A set of Exceptions which are raised through multiple abstraction layers to +    indicate backpressure. For example, SPNv2 back-pressure sometimes needs to +    be passed up through any timeout/retry code and become an actual long pause +    or crash. +    """      pass  ResourceResult = namedtuple("ResourceResult", [ @@ -628,7 +634,7 @@ class WaybackClient:  class SavePageNowError(Exception):      pass -class SavePageNowBackoffError(Exception): +class SavePageNowBackoffError(SandcrawlerBackoffError):      pass  SavePageNowResult = namedtuple('SavePageNowResult', [ @@ -702,13 +708,13 @@ class SavePageNowClient:              },          )          if resp.status_code == 429: -            raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) +            raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))          elif resp.status_code != 200:              raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))          resp_json = resp.json()          if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: -            raise SavePaperNowBackoffError(resp_json['message']) +            raise SavePageNowBackoffError(resp_json['message'])          elif not resp_json or 'job_id' not in resp_json:              raise SavePageNowError(                  "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)) @@ -785,13 +791,13 @@ class SavePageNowClient:              elif status == "error:no-access":                  status = "forbidden"              elif status == "error:user-session-limit": -                raise Exception("SPNv2 user-session-limit, need to backoff") +                raise SavePageNowBackoffError("SPNv2 user-session-limit")              elif status.startswith("error:"):                  status = "spn2-" + status              return ResourceResult(                  start_url=start_url,                  hit=False, -                status=spn_result.status, +                status=status,                  terminal_url=spn_result.terminal_url,                  terminal_dt=spn_result.terminal_dt,                  terminal_status_code=None, diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 97079ee..c290421 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher):                  for msg in batch:                      self.counts['total'] += 1                      record = json.loads(msg.value().decode('utf-8')) +                    # This complex bit of code implements backoff/backpressure +                    # in a way that will not cause this Kafka consumer to lose +                    # partition assignments (resulting in a rebalance). This +                    # was needed for the ingest workers. There is probably a +                    # better way to structure this concurrency.                      done = False                      while not done:                          try: @@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher):                              break                          except SandcrawlerBackoffError as be:                              print("Backing off for 200 seconds: {}".format(be)) -                            self.consumer.pause() +                            self.consumer.pause(self.consumer.assignment())                              for i in range(40): +                                # Beware this poll which should not be +                                # receiving any messages because we are paused!                                  empty_batch = self.consumer.poll(0)                                  assert not empty_batch                                  time.sleep(5) -                            self.consumer.resume() +                            self.consumer.resume(self.consumer.assignment())                      self.counts['pushed'] += 1                      if self.counts['total'] % 500 == 0:                          print("Import counts: {}".format(self.worker.counts), file=sys.stderr)  | 
