diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 97079ee..c290421 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) + # This complex bit of code implements backoff/backpressure + # in a way that will not cause this Kafka consumer to lose + # partition assignments (resulting in a rebalance). This + # was needed for the ingest workers. There is probably a + # better way to structure this concurrency. done = False while not done: try: @@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher): break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) - self.consumer.pause() + self.consumer.pause(self.consumer.assignment()) for i in range(40): + # Beware this poll which should not be + # receiving any messages because we are paused! empty_batch = self.consumer.poll(0) assert not empty_batch time.sleep(5) - self.consumer.resume() + self.consumer.resume(self.consumer.assignment()) self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) |