diff options
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 183 |
1 files changed, 116 insertions, 67 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 789a97a4..4a54c649 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,7 @@ import json import time -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException from .worker_common import FatcatWorker, most_recent_message @@ -21,38 +21,47 @@ class ChangelogWorker(FatcatWorker): self.offset = offset # the fatcat changelog offset, not the kafka offset def run(self): - topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using # that as the starting offset. Note that this is a single-partition # topic if self.offset is None: print("Checking for most recent changelog offset...") - msg = most_recent_message(topic) + msg = most_recent_message(self.produce_topic, self.kafka_config) if msg: self.offset = json.loads(msg.decode('utf-8'))['index'] else: - self.offset = 1 - - with topic.get_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - while True: - latest = int(self.api.get_changelog(limit=1)[0].index) - if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): - cle = self.api.get_changelog_entry(i) - obj = self.api.api_client.sanitize_for_serialization(cle) - producer.produce( - message=json.dumps(obj).encode('utf-8'), - partition_key=None, - timestamp=None, - #NOTE could be (???): timestamp=cle.timestamp, - ) - self.offset = i - print("Sleeping {} seconds...".format(self.poll_interval)) - time.sleep(self.poll_interval) + self.offset = 0 + print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + producer = Producer(self.kafka_config) + + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + self.produce_topic, + json.dumps(obj).encode('utf-8'), + key=str(i), + on_delivery=fail_fast, + #NOTE timestamp could be timestamp=cle.timestamp (?) + ) + self.offset = i + producer.poll(0) + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) class EntityUpdatesWorker(FatcatWorker): @@ -63,46 +72,83 @@ class EntityUpdatesWorker(FatcatWorker): For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic + self.poll_interval = poll_interval self.consumer_group = "entity-updates" def run(self): - changelog_topic = self.kafka.topics[self.consume_topic] - release_topic = self.kafka.topics[self.release_topic] - file_topic = self.kafka.topics[self.file_topic] - container_topic = self.kafka.topics[self.container_topic] - - consumer = changelog_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=False, - fetch_message_max_bytes=10000000, # up to ~10 MBytes - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, - ) - # using a sync producer to try and avoid racey loss of delivery (aka, - # if consumer group updated but produce didn't stick) - release_producer = release_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + def on_commit(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + print(p) + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + print("Kafka consumer commit successful") + pass + + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer_conf = self.kafka_config.copy() + consumer_conf.update({ + 'group.id': self.consumer_group, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + }) + consumer = Consumer(consumer_conf) + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'default.topic.config': { + 'request.required.acks': -1, + }, + }) + producer = Producer(producer_conf) + + consumer.subscribe([self.consume_topic], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) - file_producer = file_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) - container_producer = container_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) - - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) + print("Kafka consuming {}".format(self.consume_topic)) + + while True: + msg = consumer.poll(self.poll_interval) + if not msg: + print("nothing new from kafka (interval:{})".format(self.poll_interval)) + consumer.commit() + continue + if msg.error(): + raise KafkaException(msg.error()) + + cle = json.loads(msg.value().decode('utf-8')) #print(cle) print("processing changelog index {}".format(cle['index'])) release_ids = [] @@ -130,28 +176,31 @@ class EntityUpdatesWorker(FatcatWorker): # releases for which list changed release_ids.extend(file_entity.release_ids or []) file_dict = self.api.api_client.sanitize_for_serialization(file_entity) - file_producer.produce( - message=json.dumps(file_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, + producer.produce( + self.file_topic, + json.dumps(file_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, ) for ident in set(container_ids): container = self.api.get_container(ident) container_dict = self.api.api_client.sanitize_for_serialization(container) - container_producer.produce( - message=json.dumps(container_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, + producer.produce( + self.container_topic, + json.dumps(container_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, ) for ident in set(release_ids): release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") work_ids.append(release.work_id) release_dict = self.api.api_client.sanitize_for_serialization(release) - release_producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, + producer.produce( + self.release_topic, + json.dumps(release_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, ) # TODO: actually update works + consumer.store_offsets(msg) - #consumer.commit_offsets() |