diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-02-03 21:50:24 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-02-03 21:51:00 -0800 |
commit | c723d12dcc8c19d0302f0dfbcdd1414d834c6d73 (patch) | |
tree | 0d346d3b3376ae7220be178d5796ea64323b34e8 /python/sandcrawler/workers.py | |
parent | 15c7e430ebccbdab88355c5c1f1914c3aca99c8a (diff) | |
download | sandcrawler-c723d12dcc8c19d0302f0dfbcdd1414d834c6d73.tar.gz sandcrawler-c723d12dcc8c19d0302f0dfbcdd1414d834c6d73.zip |
improvements to reliability from prod testing
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 97079ee..c290421 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher): for msg in batch: self.counts['total'] += 1 record = json.loads(msg.value().decode('utf-8')) + # This complex bit of code implements backoff/backpressure + # in a way that will not cause this Kafka consumer to lose + # partition assignments (resulting in a rebalance). This + # was needed for the ingest workers. There is probably a + # better way to structure this concurrency. done = False while not done: try: @@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher): break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) - self.consumer.pause() + self.consumer.pause(self.consumer.assignment()) for i in range(40): + # Beware this poll which should not be + # receiving any messages because we are paused! empty_batch = self.consumer.poll(0) assert not empty_batch time.sleep(5) - self.consumer.resume() + self.consumer.resume(self.consumer.assignment()) self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) |