diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-08 21:11:25 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-08 21:11:25 -0700 |
commit | 49c754e10e55376881038d306d70ac7623fe3d90 (patch) | |
tree | cfd6dd0262555cf9fa4203b00f6f2f79b8435f15 /python/fatcat_tools/workers/changelog.py | |
parent | aac6727d2fdf59e09c1f42da19a9a894304acc00 (diff) | |
download | fatcat-49c754e10e55376881038d306d70ac7623fe3d90.tar.gz fatcat-49c754e10e55376881038d306d70ac7623fe3d90.zip |
convert pipeline workers from pykafka to confluent-kafka
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 176 |
1 files changed, 117 insertions, 59 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 6319d55a..4108012e 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,7 @@ import json import time -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException from .worker_common import FatcatWorker, most_recent_message @@ -12,7 +12,7 @@ class ChangelogWorker(FatcatWorker): found, fetch them and push (as JSON) into a Kafka topic. """ - def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): + def __init__(self, api, kafka_hosts, produce_topic, poll_interval=5.0, offset=None): # TODO: should be offset=0 super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, @@ -21,38 +21,47 @@ class ChangelogWorker(FatcatWorker): self.offset = offset # the fatcat changelog offset, not the kafka offset def run(self): - topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using # that as the starting offset. Note that this is a single-partition # topic if self.offset is None: print("Checking for most recent changelog offset...") - msg = most_recent_message(topic) + msg = most_recent_message(self.produce_topic, self.kafka_config) if msg: self.offset = json.loads(msg.decode('utf-8'))['index'] else: - self.offset = 1 - - with topic.get_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - while True: - latest = int(self.api.get_changelog(limit=1)[0].index) - if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): - cle = self.api.get_changelog_entry(i) - obj = self.api.api_client.sanitize_for_serialization(cle) - producer.produce( - message=json.dumps(obj).encode('utf-8'), - partition_key=None, - timestamp=None, - #NOTE could be (???): timestamp=cle.timestamp, - ) - self.offset = i - print("Sleeping {} seconds...".format(self.poll_interval)) - time.sleep(self.poll_interval) + self.offset = 0 + print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + producer = Producer(self.kafka_config) + + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + self.produce_topic, + json.dumps(obj).encode('utf-8'), + key=str(i), + on_delivery=fail_fast, + #NOTE timestamp could be timestamp=cle.timestamp (?) + ) + self.offset = i + producer.poll(0) + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) class EntityUpdatesWorker(FatcatWorker): @@ -63,45 +72,94 @@ class EntityUpdatesWorker(FatcatWorker): For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic + self.poll_interval = poll_interval self.consumer_group = "entity-updates" def run(self): - changelog_topic = self.kafka.topics[self.consume_topic] - release_topic = self.kafka.topics[self.release_topic] - - consumer = changelog_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=False, - fetch_message_max_bytes=4000000, # up to ~4MBytes - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + def on_commit(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + print(p) + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + print("Kafka consumer commit successful") + pass + + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer_conf = self.kafka_config.copy() + consumer_conf.update({ + 'group.id': self.consumer_group, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + }) + consumer = Consumer(consumer_conf) + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'default.topic.config': { + 'request.required.acks': -1, + }, + }) + producer = Producer(producer_conf) + + consumer.subscribe([self.consume_topic], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) + print("Kafka consuming {}".format(self.consume_topic)) + + while True: + msg = consumer.poll(self.poll_interval) + if not msg: + print("nothing new from kafka (interval:{})".format(self.poll_interval)) + consumer.commit() + continue + if msg.error(): + raise KafkaException(msg.error()) + + cle = json.loads(msg.value().decode('utf-8')) + #print(cle) + print("processing changelog index {}".format(cle['index'])) + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + ident = re['ident'] + release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") + # TODO: use .to_json() helper + release_dict = self.api.api_client.sanitize_for_serialization(release) + producer.produce( + self.release_topic, + json.dumps(release_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, + ) + consumer.store_offsets(msg) - # using a sync producer to try and avoid racey loss of delivery (aka, - # if consumer group updated but produce didn't stick) - with release_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) - #print(cle) - print("processing changelog index {}".format(cle['index'])) - release_edits = cle['editgroup']['edits']['releases'] - for re in release_edits: - ident = re['ident'] - release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") - release_dict = self.api.api_client.sanitize_for_serialization(release) - producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - #consumer.commit_offsets() |