summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/worker_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/worker_common.py')
-rw-r--r--python/fatcat_tools/workers/worker_common.py32
1 files changed, 15 insertions, 17 deletions
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index 8c2936be..baec44f4 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -1,4 +1,3 @@
-
from confluent_kafka import Consumer, KafkaException, TopicPartition
@@ -13,22 +12,21 @@ def most_recent_message(topic, kafka_config):
print("Fetching most Kafka message from {}".format(topic))
conf = kafka_config.copy()
- conf.update({
- 'group.id': 'worker-init-last-msg', # should never commit
- 'delivery.report.only.error': True,
- 'enable.auto.commit': False,
- 'default.topic.config': {
- 'request.required.acks': -1,
- 'auto.offset.reset': 'latest',
- },
- })
+ conf.update(
+ {
+ "group.id": "worker-init-last-msg", # should never commit
+ "delivery.report.only.error": True,
+ "enable.auto.commit": False,
+ "default.topic.config": {
+ "request.required.acks": -1,
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
consumer = Consumer(conf)
- hwm = consumer.get_watermark_offsets(
- TopicPartition(topic, 0),
- timeout=5.0,
- cached=False)
+ hwm = consumer.get_watermark_offsets(TopicPartition(topic, 0), timeout=5.0, cached=False)
if not hwm:
raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic))
print("High watermarks: {}".format(hwm))
@@ -37,7 +35,7 @@ def most_recent_message(topic, kafka_config):
print("topic is new; not 'most recent message'")
return None
- consumer.assign([TopicPartition(topic, 0, hwm[1]-1)])
+ consumer.assign([TopicPartition(topic, 0, hwm[1] - 1)])
msg = consumer.poll(2.0)
consumer.close()
if not msg:
@@ -56,8 +54,8 @@ class FatcatWorker:
if api:
self.api = api
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
}
self.produce_topic = produce_topic
self.consume_topic = consume_topic