aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-02-03 20:42:15 -0800
committerBryan Newbold <bnewbold@archive.org>2020-02-03 21:51:00 -0800
commit15c7e430ebccbdab88355c5c1f1914c3aca99c8a (patch)
tree6ce8d01b11f6e2c6792138046219c7e624aa2d0d /python/sandcrawler/workers.py
parent5f9e7fd4c89df98ed90be9629d3dc6c201b42a02 (diff)
downloadsandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.tar.gz
sandcrawler-15c7e430ebccbdab88355c5c1f1914c3aca99c8a.zip
hack-y backoff ingest attempt
The goal here is to have SPNv2 requests backoff when we get back-pressure (usually caused by some sessions taking too long). Lack of proper back-pressure is making it hard to turn up parallelism. This is a hack because we still timeout and drop the slow request. A better way is probably to have a background thread run, while the KafkaPusher thread does polling. Maybe with timeouts to detect slow processing (greater than 30 seconds?) and only pause/resume in that case. This would also make taking batches easier. Unlike the existing code, however, the parallelism needs to happen at the Pusher level to do the polling (Kafka) and "await" (for all worker threads to complete) correctly.
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py16
1 files changed, 15 insertions, 1 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 1e54a28..97079ee 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,12 +1,14 @@
import sys
import json
+import time
import zipfile
import multiprocessing.pool
from collections import Counter
from confluent_kafka import Consumer, Producer, KafkaException
from .misc import parse_cdx_line
+from .ia import SandcrawlerBackoffError
class SandcrawlerWorker(object):
@@ -367,7 +369,19 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
- self.worker.push_record(record)
+ done = False
+ while not done:
+ try:
+ self.worker.push_record(record)
+ break
+ except SandcrawlerBackoffError as be:
+ print("Backing off for 200 seconds: {}".format(be))
+ self.consumer.pause()
+ for i in range(40):
+ empty_batch = self.consumer.poll(0)
+ assert not empty_batch
+ time.sleep(5)
+ self.consumer.resume()
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)