aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'fatcat_scholar/kafka.py')
-rw-r--r--fatcat_scholar/kafka.py14
1 files changed, 10 insertions, 4 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py
index 71067c1..9fd43cf 100644
--- a/fatcat_scholar/kafka.py
+++ b/fatcat_scholar/kafka.py
@@ -103,7 +103,8 @@ class KafkaWorker:
# check for partition-specific commit errors
if p.error:
print(
- f"Kafka consumer commit error: {p.error}", file=sys.stderr,
+ f"Kafka consumer commit error: {p.error}",
+ file=sys.stderr,
)
raise KafkaException(p.error)
@@ -118,12 +119,16 @@ class KafkaWorker:
# user code timeout; if no poll after this long, assume user code
# hung and rebalance (default: 6min)
"max.poll.interval.ms": 360000,
- "default.topic.config": {"auto.offset.reset": "latest",},
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
+ },
}
consumer = Consumer(config)
consumer.subscribe(
- consume_topics, on_assign=_on_rebalance, on_revoke=_on_rebalance,
+ consume_topics,
+ on_assign=_on_rebalance,
+ on_revoke=_on_rebalance,
)
print(
f"Consuming from kafka topics {consume_topics}, group {consumer_group}",
@@ -161,7 +166,8 @@ class KafkaWorker:
while True:
batch = self.consumer.consume(
- num_messages=self.batch_size, timeout=self.poll_interval_sec,
+ num_messages=self.batch_size,
+ timeout=self.poll_interval_sec,
)
print(