diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-02-03 20:42:15 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-02-03 21:51:00 -0800 |
commit | 15c7e430ebccbdab88355c5c1f1914c3aca99c8a (patch) | |
tree | 6ce8d01b11f6e2c6792138046219c7e624aa2d0d /python/sandcrawler/workers.py | |
parent | 5f9e7fd4c89df98ed90be9629d3dc6c201b42a02 (diff) | |
download | sandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.tar.gz sandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.zip |
hack-y backoff ingest attempt
The goal here is to have SPNv2 requests backoff when we get
back-pressure (usually caused by some sessions taking too long). Lack of
proper back-pressure is making it hard to turn up parallelism.
This is a hack because we still timeout and drop the slow request. A
better way is probably to have a background thread run, while the
KafkaPusher thread does polling. Maybe with timeouts to detect slow
processing (greater than 30 seconds?) and only pause/resume in that
case. This would also make taking batches easier. Unlike the existing
code, however, the parallelism needs to happen at the Pusher level to do
the polling (Kafka) and "await" (for all worker threads to complete)
correctly.
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 1e54a28..97079ee 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,12 +1,14 @@ import sys import json +import time import zipfile import multiprocessing.pool from collections import Counter from confluent_kafka import Consumer, Producer, KafkaException from .misc import parse_cdx_line +from .ia import SandcrawlerBackoffError class SandcrawlerWorker(object): @@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) - self.worker.push_record(record) + done = False + while not done: + try: + self.worker.push_record(record) + break + except SandcrawlerBackoffError as be: + print("Backing off for 200 seconds: {}".format(be)) + self.consumer.pause() + for i in range(40): + empty_batch = self.consumer.poll(0) + assert not empty_batch + time.sleep(5) + self.consumer.resume() self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) |