aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/worker_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/worker_common.py')
-rw-r--r--python/fatcat_tools/workers/worker_common.py77
1 files changed, 48 insertions, 29 deletions
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index cb4e5dab..1d465f58 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -5,41 +5,56 @@ import csv
import json
import itertools
from itertools import islice
-from pykafka import KafkaClient
-from pykafka.common import OffsetType
+from confluent_kafka import Consumer, KafkaException, TopicPartition
import fatcat_client
from fatcat_client.rest import ApiException
-def most_recent_message(topic):
+def most_recent_message(topic, kafka_config):
"""
Tries to fetch the most recent message from a given topic.
- This only makes sense for single partition topics, though could be
- extended with "last N" behavior.
- Following "Consuming the last N messages from a topic"
- from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns
+ This only makes sense for single partition topics (it works with only the
+ first partition), though could be extended with "last N" behavior.
"""
- consumer = topic.get_simple_consumer(
- auto_offset_reset=OffsetType.LATEST,
- reset_offset_on_start=True)
- offsets = [(p, op.last_offset_consumed - 1)
- for p, op in consumer._partitions.items()]
- offsets = [(p, (o if o > -1 else -2)) for p, o in offsets]
- if -2 in [o for p, o in offsets]:
- consumer.stop()
+
+ print("Fetching most Kafka message from {}".format(topic))
+
+ conf = kafka_config.copy()
+ conf.update({
+ 'group.id': 'worker-init-last-msg', # should never commit
+ 'delivery.report.only.error': True,
+ 'enable.auto.commit': False,
+ 'default.topic.config': {
+ 'request.required.acks': -1,
+ 'auto.offset.reset': 'latest',
+ },
+ })
+
+ consumer = Consumer(conf)
+
+ hwm = consumer.get_watermark_offsets(
+ TopicPartition(topic, 0),
+ timeout=5.0,
+ cached=False)
+ if not hwm:
+ raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic))
+ print("High watermarks: {}".format(hwm))
+
+ if hwm[1] == 0:
+ print("topic is new; not 'most recent message'")
return None
- else:
- consumer.reset_offsets(offsets)
- msg = islice(consumer, 1)
- if msg:
- val = list(msg)[0].value
- consumer.stop()
- return val
- else:
- consumer.stop()
- return None
+
+ consumer.assign([TopicPartition(topic, 0, hwm[1]-1)])
+ msg = consumer.poll(2.0)
+ consumer.close()
+ if not msg:
+ raise Exception("Failed to fetch most recent kafka message")
+ if msg.error():
+ raise KafkaException(msg.error())
+ return msg.value()
+
class FatcatWorker:
"""
@@ -49,9 +64,13 @@ class FatcatWorker:
def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
if api:
self.api = api
- self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
+ self.kafka_config = {
+ 'bootstrap.servers': kafka_hosts,
+ 'delivery.report.only.error': True,
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ 'default.topic.config': {
+ 'request.required.acks': 'all',
+ },
+ }
self.produce_topic = produce_topic
self.consume_topic = consume_topic
-
- # Kafka producer batch size tuning; also limit on size of single document
- self.produce_max_request_size = 10000000 # 10 MByte-ish