diff options
Diffstat (limited to 'fatcat_scholar/kafka.py')
-rw-r--r-- | fatcat_scholar/kafka.py | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py index 71067c1..9fd43cf 100644 --- a/fatcat_scholar/kafka.py +++ b/fatcat_scholar/kafka.py @@ -103,7 +103,8 @@ class KafkaWorker: # check for partition-specific commit errors if p.error: print( - f"Kafka consumer commit error: {p.error}", file=sys.stderr, + f"Kafka consumer commit error: {p.error}", + file=sys.stderr, ) raise KafkaException(p.error) @@ -118,12 +119,16 @@ class KafkaWorker: # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 6min) "max.poll.interval.ms": 360000, - "default.topic.config": {"auto.offset.reset": "latest",}, + "default.topic.config": { + "auto.offset.reset": "latest", + }, } consumer = Consumer(config) consumer.subscribe( - consume_topics, on_assign=_on_rebalance, on_revoke=_on_rebalance, + consume_topics, + on_assign=_on_rebalance, + on_revoke=_on_rebalance, ) print( f"Consuming from kafka topics {consume_topics}, group {consumer_group}", @@ -161,7 +166,8 @@ class KafkaWorker: while True: batch = self.consumer.consume( - num_messages=self.batch_size, timeout=self.poll_interval_sec, + num_messages=self.batch_size, + timeout=self.poll_interval_sec, ) print( |