From 15c7e430ebccbdab88355c5c1f1914c3aca99c8a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 3 Feb 2020 20:42:15 -0800 Subject: hack-y backoff ingest attempt The goal here is to have SPNv2 requests backoff when we get back-pressure (usually caused by some sessions taking too long). Lack of proper back-pressure is making it hard to turn up parallelism. This is a hack because we still timeout and drop the slow request. A better way is probably to have a background thread run, while the KafkaPusher thread does polling. Maybe with timeouts to detect slow processing (greater than 30 seconds?) and only pause/resume in that case. This would also make taking batches easier. Unlike the existing code, however, the parallelism needs to happen at the Pusher level to do the polling (Kafka) and "await" (for all worker threads to complete) correctly. --- python/sandcrawler/workers.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'python/sandcrawler/workers.py') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 1e54a28..97079ee 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,12 +1,14 @@ import sys import json +import time import zipfile import multiprocessing.pool from collections import Counter from confluent_kafka import Consumer, Producer, KafkaException from .misc import parse_cdx_line +from .ia import SandcrawlerBackoffError class SandcrawlerWorker(object): @@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) - self.worker.push_record(record) + done = False + while not done: + try: + self.worker.push_record(record) + break + except SandcrawlerBackoffError as be: + print("Backing off for 200 seconds: {}".format(be)) + self.consumer.pause() + for i in range(40): + empty_batch = self.consumer.poll(0) + assert not empty_batch + time.sleep(5) + self.consumer.resume() self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) -- cgit v1.2.3