diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-10-27 14:33:09 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-10-27 18:25:58 -0700 |
commit | b4d0bfb643842e6070a4ee36cfc52e2292e3b5ba (patch) | |
tree | 1988743c61f5c053148d66f6939ff32f398ae06e /fatcat_scholar/kafka.py | |
parent | 956e64f47f7d47f2539cd6575c25ec0b6a33e567 (diff) | |
download | fatcat-scholar-b4d0bfb643842e6070a4ee36cfc52e2292e3b5ba.tar.gz fatcat-scholar-b4d0bfb643842e6070a4ee36cfc52e2292e3b5ba.zip |
make fmt (black 21.9b0)
Diffstat (limited to 'fatcat_scholar/kafka.py')
-rw-r--r-- | fatcat_scholar/kafka.py | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py index 71067c1..9fd43cf 100644 --- a/fatcat_scholar/kafka.py +++ b/fatcat_scholar/kafka.py @@ -103,7 +103,8 @@ class KafkaWorker: # check for partition-specific commit errors if p.error: print( - f"Kafka consumer commit error: {p.error}", file=sys.stderr, + f"Kafka consumer commit error: {p.error}", + file=sys.stderr, ) raise KafkaException(p.error) @@ -118,12 +119,16 @@ class KafkaWorker: # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 6min) "max.poll.interval.ms": 360000, - "default.topic.config": {"auto.offset.reset": "latest",}, + "default.topic.config": { + "auto.offset.reset": "latest", + }, } consumer = Consumer(config) consumer.subscribe( - consume_topics, on_assign=_on_rebalance, on_revoke=_on_rebalance, + consume_topics, + on_assign=_on_rebalance, + on_revoke=_on_rebalance, ) print( f"Consuming from kafka topics {consume_topics}, group {consumer_group}", @@ -161,7 +166,8 @@ class KafkaWorker: while True: batch = self.consumer.consume( - num_messages=self.batch_size, timeout=self.poll_interval_sec, + num_messages=self.batch_size, + timeout=self.poll_interval_sec, ) print( |