aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/ia.py16
-rw-r--r--python/sandcrawler/workers.py11
2 files changed, 20 insertions, 7 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index 14716fc..3be1a79 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -21,6 +21,12 @@ from gwb.loader import CDXLoaderFactory
from .misc import b32_hex, requests_retry_session, gen_file_metadata
class SandcrawlerBackoffError(Exception):
+ """
+ A set of Exceptions which are raised through multiple abstraction layers to
+ indicate backpressure. For example, SPNv2 back-pressure sometimes needs to
+ be passed up through any timeout/retry code and become an actual long pause
+ or crash.
+ """
pass
ResourceResult = namedtuple("ResourceResult", [
@@ -628,7 +634,7 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
-class SavePageNowBackoffError(Exception):
+class SavePageNowBackoffError(SandcrawlerBackoffError):
pass
SavePageNowResult = namedtuple('SavePageNowResult', [
@@ -702,13 +708,13 @@ class SavePageNowClient:
},
)
if resp.status_code == 429:
- raise SavePaperNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
elif resp.status_code != 200:
raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
resp_json = resp.json()
if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
- raise SavePaperNowBackoffError(resp_json['message'])
+ raise SavePageNowBackoffError(resp_json['message'])
elif not resp_json or 'job_id' not in resp_json:
raise SavePageNowError(
"Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json))
@@ -785,13 +791,13 @@ class SavePageNowClient:
elif status == "error:no-access":
status = "forbidden"
elif status == "error:user-session-limit":
- raise Exception("SPNv2 user-session-limit, need to backoff")
+ raise SavePageNowBackoffError("SPNv2 user-session-limit")
elif status.startswith("error:"):
status = "spn2-" + status
return ResourceResult(
start_url=start_url,
hit=False,
- status=spn_result.status,
+ status=status,
terminal_url=spn_result.terminal_url,
terminal_dt=spn_result.terminal_dt,
terminal_status_code=None,
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 97079ee..c290421 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
+ # This complex bit of code implements backoff/backpressure
+ # in a way that will not cause this Kafka consumer to lose
+ # partition assignments (resulting in a rebalance). This
+ # was needed for the ingest workers. There is probably a
+ # better way to structure this concurrency.
done = False
while not done:
try:
@@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher):
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
- self.consumer.pause()
+ self.consumer.pause(self.consumer.assignment())
for i in range(40):
+ # Beware this poll which should not be
+ # receiving any messages because we are paused!
empty_batch = self.consumer.poll(0)
assert not empty_batch
time.sleep(5)
- self.consumer.resume()
+ self.consumer.resume(self.consumer.assignment())
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)