diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 1e54a28..97079ee 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,12 +1,14 @@ import sys import json +import time import zipfile import multiprocessing.pool from collections import Counter from confluent_kafka import Consumer, Producer, KafkaException from .misc import parse_cdx_line +from .ia import SandcrawlerBackoffError class SandcrawlerWorker(object): @@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) - self.worker.push_record(record) + done = False + while not done: + try: + self.worker.push_record(record) + break + except SandcrawlerBackoffError as be: + print("Backing off for 200 seconds: {}".format(be)) + self.consumer.pause() + for i in range(40): + empty_batch = self.consumer.poll(0) + assert not empty_batch + time.sleep(5) + self.consumer.resume() self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) |