From 345b24d6a9efbffd0ff3fd3c65e22894b498a2c6 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 8 Apr 2019 21:11:25 -0700 Subject: convert pipeline workers from pykafka to confluent-kafka --- python/fatcat_tools/workers/changelog.py | 183 +++++++++++++++++---------- python/fatcat_tools/workers/elasticsearch.py | 95 +++++++++----- python/fatcat_tools/workers/worker_common.py | 77 ++++++----- 3 files changed, 230 insertions(+), 125 deletions(-) (limited to 'python/fatcat_tools/workers') diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 789a97a4..4a54c649 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,7 @@ import json import time -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException from .worker_common import FatcatWorker, most_recent_message @@ -21,38 +21,47 @@ class ChangelogWorker(FatcatWorker): self.offset = offset # the fatcat changelog offset, not the kafka offset def run(self): - topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using # that as the starting offset. Note that this is a single-partition # topic if self.offset is None: print("Checking for most recent changelog offset...") - msg = most_recent_message(topic) + msg = most_recent_message(self.produce_topic, self.kafka_config) if msg: self.offset = json.loads(msg.decode('utf-8'))['index'] else: - self.offset = 1 - - with topic.get_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - while True: - latest = int(self.api.get_changelog(limit=1)[0].index) - if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): - cle = self.api.get_changelog_entry(i) - obj = self.api.api_client.sanitize_for_serialization(cle) - producer.produce( - message=json.dumps(obj).encode('utf-8'), - partition_key=None, - timestamp=None, - #NOTE could be (???): timestamp=cle.timestamp, - ) - self.offset = i - print("Sleeping {} seconds...".format(self.poll_interval)) - time.sleep(self.poll_interval) + self.offset = 0 + print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + producer = Producer(self.kafka_config) + + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + self.produce_topic, + json.dumps(obj).encode('utf-8'), + key=str(i), + on_delivery=fail_fast, + #NOTE timestamp could be timestamp=cle.timestamp (?) + ) + self.offset = i + producer.poll(0) + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) class EntityUpdatesWorker(FatcatWorker): @@ -63,46 +72,83 @@ class EntityUpdatesWorker(FatcatWorker): For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic + self.poll_interval = poll_interval self.consumer_group = "entity-updates" def run(self): - changelog_topic = self.kafka.topics[self.consume_topic] - release_topic = self.kafka.topics[self.release_topic] - file_topic = self.kafka.topics[self.file_topic] - container_topic = self.kafka.topics[self.container_topic] - - consumer = changelog_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=False, - fetch_message_max_bytes=10000000, # up to ~10 MBytes - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, - ) - # using a sync producer to try and avoid racey loss of delivery (aka, - # if consumer group updated but produce didn't stick) - release_producer = release_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + def on_commit(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + print(p) + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + print("Kafka consumer commit successful") + pass + + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer_conf = self.kafka_config.copy() + consumer_conf.update({ + 'group.id': self.consumer_group, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + }) + consumer = Consumer(consumer_conf) + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'default.topic.config': { + 'request.required.acks': -1, + }, + }) + producer = Producer(producer_conf) + + consumer.subscribe([self.consume_topic], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) - file_producer = file_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) - container_producer = container_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) - - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) + print("Kafka consuming {}".format(self.consume_topic)) + + while True: + msg = consumer.poll(self.poll_interval) + if not msg: + print("nothing new from kafka (interval:{})".format(self.poll_interval)) + consumer.commit() + continue + if msg.error(): + raise KafkaException(msg.error()) + + cle = json.loads(msg.value().decode('utf-8')) #print(cle) print("processing changelog index {}".format(cle['index'])) release_ids = [] @@ -130,28 +176,31 @@ class EntityUpdatesWorker(FatcatWorker): # releases for which list changed release_ids.extend(file_entity.release_ids or []) file_dict = self.api.api_client.sanitize_for_serialization(file_entity) - file_producer.produce( - message=json.dumps(file_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, + producer.produce( + self.file_topic, + json.dumps(file_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, ) for ident in set(container_ids): container = self.api.get_container(ident) container_dict = self.api.api_client.sanitize_for_serialization(container) - container_producer.produce( - message=json.dumps(container_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, + producer.produce( + self.container_topic, + json.dumps(container_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, ) for ident in set(release_ids): release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") work_ids.append(release.work_id) release_dict = self.api.api_client.sanitize_for_serialization(release) - release_producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, + producer.produce( + self.release_topic, + json.dumps(release_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, ) # TODO: actually update works + consumer.store_offsets(msg) - #consumer.commit_offsets() diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 91224d98..547e270c 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,7 +2,7 @@ import json import time import requests -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient from fatcat_tools import * @@ -18,10 +18,13 @@ class ElasticsearchReleaseWorker(FatcatWorker): """ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", + batch_size=200): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates" + self.batch_size = batch_size + self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index self.entity_type = ReleaseEntity @@ -29,52 +32,86 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.transform_func = release_to_elasticsearch def run(self): - consume_topic = self.kafka.topics[self.consume_topic] ac = ApiClient() - consumer = consume_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - fetch_message_max_bytes=10000000, # up to ~10 MBytes - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer_conf = self.kafka_config.copy() + consumer_conf.update({ + 'group.id': self.consumer_group, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + }) + consumer = Consumer(consumer_conf) + consumer.subscribe([self.consume_topic], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) - for msg in consumer: - json_str = msg.value.decode('utf-8') - # HACK: work around a bug where container entities got published to - # release_v03 topic - if self.elasticsearch_document_name == "release": - entity_dict = json.loads(json_str) - if entity_dict.get('name') and not entity_dict.get('title'): - continue - entity = entity_from_json(json_str, self.entity_type, api_client=ac) - #print(entity) - elasticsearch_endpoint = "{}/{}/{}/{}".format( + while True: + batch = consumer.consume( + num_messages=self.batch_size, + timeout=self.poll_interval) + if not batch: + if not consumer.assignment(): + print("... no Kafka consumer partitions assigned yet") + print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval)) + continue + print("... got {} kafka messages".format(len(batch))) + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + bulk_actions = [] + for msg in batch: + json_str = msg.value().decode('utf-8') + entity = entity_from_json(json_str, ReleaseEntity, api_client=ac) + print("Upserting: release/{}".format(entity.ident)) + bulk_actions.append(json.dumps({ + "index": { "_id": entity.ident, }, + })) + bulk_actions.append(json.dumps( + release_to_elasticsearch(entity))) + elasticsearch_endpoint = "{}/{}/release/_bulk".format( self.elasticsearch_backend, - self.elasticsearch_index, - self.elasticsearch_document_name, - entity.ident) - print("Updating document: {}".format(elasticsearch_endpoint)) - resp = requests.post(elasticsearch_endpoint, json=self.transform_func(entity)) + self.elasticsearch_index) + resp = requests.post(elasticsearch_endpoint, + headers={"Content-Type": "application/x-ndjson"}, + data="\n".join(bulk_actions) + "\n") resp.raise_for_status() - #consumer.commit_offsets() + if resp.json()['errors']: + desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) + print(desc) + print(resp.content) + raise Exception(desc) + consumer.store_offsets(batch[-1]) + class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", + batch_size=200): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, poll_interval=poll_interval, offset=offset, elasticsearch_backend=elasticsearch_backend, - elasticsearch_index=elasticsearch_index) + elasticsearch_index=elasticsearch_index, + batch_size=batch_size) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates2" self.entity_type = ContainerEntity self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch + diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 9ffbe5fd..fb8cfc19 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -5,41 +5,56 @@ import csv import json import itertools from itertools import islice -from pykafka import KafkaClient -from pykafka.common import OffsetType +from confluent_kafka import Consumer, KafkaException, TopicPartition import fatcat_openapi_client from fatcat_openapi_client.rest import ApiException -def most_recent_message(topic): +def most_recent_message(topic, kafka_config): """ Tries to fetch the most recent message from a given topic. - This only makes sense for single partition topics, though could be - extended with "last N" behavior. - Following "Consuming the last N messages from a topic" - from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + This only makes sense for single partition topics (it works with only the + first partition), though could be extended with "last N" behavior. """ - consumer = topic.get_simple_consumer( - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=True) - offsets = [(p, op.last_offset_consumed - 1) - for p, op in consumer._partitions.items()] - offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] - if -2 in [o for p, o in offsets]: - consumer.stop() + + print("Fetching most Kafka message from {}".format(topic)) + + conf = kafka_config.copy() + conf.update({ + 'group.id': 'worker-init-last-msg', # should never commit + 'delivery.report.only.error': True, + 'enable.auto.commit': False, + 'default.topic.config': { + 'request.required.acks': -1, + 'auto.offset.reset': 'latest', + }, + }) + + consumer = Consumer(conf) + + hwm = consumer.get_watermark_offsets( + TopicPartition(topic, 0), + timeout=5.0, + cached=False) + if not hwm: + raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) + print("High watermarks: {}".format(hwm)) + + if hwm[1] == 0: + print("topic is new; not 'most recent message'") return None - else: - consumer.reset_offsets(offsets) - msg = islice(consumer, 1) - if msg: - val = list(msg)[0].value - consumer.stop() - return val - else: - consumer.stop() - return None + + consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) + msg = consumer.poll(2.0) + consumer.close() + if not msg: + raise Exception("Failed to fetch most recent kafka message") + if msg.error(): + raise KafkaException(msg.error()) + return msg.value() + class FatcatWorker: """ @@ -49,9 +64,13 @@ class FatcatWorker: def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None): if api: self.api = api - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + 'default.topic.config': { + 'request.required.acks': 'all', + }, + } self.produce_topic = produce_topic self.consume_topic = consume_topic - - # Kafka producer batch size tuning; also limit on size of single document - self.produce_max_request_size = 10000000 # 10 MByte-ish -- cgit v1.2.3