diff options
Diffstat (limited to 'python/fatcat_tools/workers/worker_common.py')
-rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 32 |
1 files changed, 15 insertions, 17 deletions
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 8c2936be..baec44f4 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -1,4 +1,3 @@ - from confluent_kafka import Consumer, KafkaException, TopicPartition @@ -13,22 +12,21 @@ def most_recent_message(topic, kafka_config): print("Fetching most Kafka message from {}".format(topic)) conf = kafka_config.copy() - conf.update({ - 'group.id': 'worker-init-last-msg', # should never commit - 'delivery.report.only.error': True, - 'enable.auto.commit': False, - 'default.topic.config': { - 'request.required.acks': -1, - 'auto.offset.reset': 'latest', - }, - }) + conf.update( + { + "group.id": "worker-init-last-msg", # should never commit + "delivery.report.only.error": True, + "enable.auto.commit": False, + "default.topic.config": { + "request.required.acks": -1, + "auto.offset.reset": "latest", + }, + } + ) consumer = Consumer(conf) - hwm = consumer.get_watermark_offsets( - TopicPartition(topic, 0), - timeout=5.0, - cached=False) + hwm = consumer.get_watermark_offsets(TopicPartition(topic, 0), timeout=5.0, cached=False) if not hwm: raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) print("High watermarks: {}".format(hwm)) @@ -37,7 +35,7 @@ def most_recent_message(topic, kafka_config): print("topic is new; not 'most recent message'") return None - consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) + consumer.assign([TopicPartition(topic, 0, hwm[1] - 1)]) msg = consumer.poll(2.0) consumer.close() if not msg: @@ -56,8 +54,8 @@ class FatcatWorker: if api: self.api = api self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker-side max is ~50 MBytes } self.produce_topic = produce_topic self.consume_topic = consume_topic |