aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py16
1 files changed, 15 insertions, 1 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 1e54a28..97079ee 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,12 +1,14 @@
import sys
import json
+import time
import zipfile
import multiprocessing.pool
from collections import Counter
from confluent_kafka import Consumer, Producer, KafkaException
from .misc import parse_cdx_line
+from .ia import SandcrawlerBackoffError
class SandcrawlerWorker(object):
@@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
- self.worker.push_record(record)
+ done = False
+ while not done:
+ try:
+ self.worker.push_record(record)
+ break
+ except SandcrawlerBackoffError as be:
+ print("Backing off for 200 seconds: {}".format(be))
+ self.consumer.pause()
+ for i in range(40):
+ empty_batch = self.consumer.poll(0)
+ assert not empty_batch
+ time.sleep(5)
+ self.consumer.resume()
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)