aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/worker_common.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:59 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:59 -0700
commit31d1a6a713d177990609767d508209ced19ca396 (patch)
treea628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/workers/worker_common.py
parent9dc891b8098542bb089c8c47098b60a8beb76a53 (diff)
downloadfatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz
fatcat-31d1a6a713d177990609767d508209ced19ca396.zip
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/workers/worker_common.py')
-rw-r--r--python/fatcat_tools/workers/worker_common.py32
1 files changed, 15 insertions, 17 deletions
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index 8c2936be..baec44f4 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -1,4 +1,3 @@
-
from confluent_kafka import Consumer, KafkaException, TopicPartition
@@ -13,22 +12,21 @@ def most_recent_message(topic, kafka_config):
print("Fetching most Kafka message from {}".format(topic))
conf = kafka_config.copy()
- conf.update({
- 'group.id': 'worker-init-last-msg', # should never commit
- 'delivery.report.only.error': True,
- 'enable.auto.commit': False,
- 'default.topic.config': {
- 'request.required.acks': -1,
- 'auto.offset.reset': 'latest',
- },
- })
+ conf.update(
+ {
+ "group.id": "worker-init-last-msg", # should never commit
+ "delivery.report.only.error": True,
+ "enable.auto.commit": False,
+ "default.topic.config": {
+ "request.required.acks": -1,
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
consumer = Consumer(conf)
- hwm = consumer.get_watermark_offsets(
- TopicPartition(topic, 0),
- timeout=5.0,
- cached=False)
+ hwm = consumer.get_watermark_offsets(TopicPartition(topic, 0), timeout=5.0, cached=False)
if not hwm:
raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic))
print("High watermarks: {}".format(hwm))
@@ -37,7 +35,7 @@ def most_recent_message(topic, kafka_config):
print("topic is new; not 'most recent message'")
return None
- consumer.assign([TopicPartition(topic, 0, hwm[1]-1)])
+ consumer.assign([TopicPartition(topic, 0, hwm[1] - 1)])
msg = consumer.poll(2.0)
consumer.close()
if not msg:
@@ -56,8 +54,8 @@ class FatcatWorker:
if api:
self.api = api
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
}
self.produce_topic = produce_topic
self.consume_topic = consume_topic