aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-02-03 21:50:24 -0800
committerBryan Newbold <bnewbold@archive.org>2020-02-03 21:51:00 -0800
commitc723d12dcc8c19d0302f0dfbcdd1414d834c6d73 (patch)
tree0d346d3b3376ae7220be178d5796ea64323b34e8 /python/sandcrawler/workers.py
parent15c7e430ebccbdab88355c5c1f1914c3aca99c8a (diff)
downloadsandcrawler-c723d12dcc8c19d0302f0dfbcdd1414d834c6d73.tar.gz
sandcrawler-c723d12dcc8c19d0302f0dfbcdd1414d834c6d73.zip
improvements to reliability from prod testing
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py11
1 files changed, 9 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 97079ee..c290421 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
+ # This complex bit of code implements backoff/backpressure
+ # in a way that will not cause this Kafka consumer to lose
+ # partition assignments (resulting in a rebalance). This
+ # was needed for the ingest workers. There is probably a
+ # better way to structure this concurrency.
done = False
while not done:
try:
@@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher):
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
- self.consumer.pause()
+ self.consumer.pause(self.consumer.assignment())
for i in range(40):
+ # Beware this poll which should not be
+ # receiving any messages because we are paused!
empty_batch = self.consumer.poll(0)
assert not empty_batch
time.sleep(5)
- self.consumer.resume()
+ self.consumer.resume(self.consumer.assignment())
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)