From c723d12dcc8c19d0302f0dfbcdd1414d834c6d73 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 3 Feb 2020 21:50:24 -0800 Subject: improvements to reliability from prod testing --- python/sandcrawler/workers.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'python/sandcrawler/workers.py') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 97079ee..c290421 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) + # This complex bit of code implements backoff/backpressure + # in a way that will not cause this Kafka consumer to lose + # partition assignments (resulting in a rebalance). This + # was needed for the ingest workers. There is probably a + # better way to structure this concurrency. done = False while not done: try: @@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher): break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) - self.consumer.pause() + self.consumer.pause(self.consumer.assignment()) for i in range(40): + # Beware this poll which should not be + # receiving any messages because we are paused! empty_batch = self.consumer.poll(0) assert not empty_batch time.sleep(5) - self.consumer.resume() + self.consumer.resume(self.consumer.assignment()) self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) -- cgit v1.2.3