aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py11
1 files changed, 9 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 97079ee..c290421 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -369,6 +369,11 @@ class KafkaJsonPusher(RecordPusher):
for msg in batch:
self.counts['total'] += 1
record = json.loads(msg.value().decode('utf-8'))
+ # This complex bit of code implements backoff/backpressure
+ # in a way that will not cause this Kafka consumer to lose
+ # partition assignments (resulting in a rebalance). This
+ # was needed for the ingest workers. There is probably a
+ # better way to structure this concurrency.
done = False
while not done:
try:
@@ -376,12 +381,14 @@ class KafkaJsonPusher(RecordPusher):
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
- self.consumer.pause()
+ self.consumer.pause(self.consumer.assignment())
for i in range(40):
+ # Beware this poll which should not be
+ # receiving any messages because we are paused!
empty_batch = self.consumer.poll(0)
assert not empty_batch
time.sleep(5)
- self.consumer.resume()
+ self.consumer.resume(self.consumer.assignment())
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)