diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-08 21:11:25 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-08 21:11:25 -0700 |
commit | 49c754e10e55376881038d306d70ac7623fe3d90 (patch) | |
tree | cfd6dd0262555cf9fa4203b00f6f2f79b8435f15 /python/fatcat_tools/workers | |
parent | aac6727d2fdf59e09c1f42da19a9a894304acc00 (diff) | |
download | fatcat-49c754e10e55376881038d306d70ac7623fe3d90.tar.gz fatcat-49c754e10e55376881038d306d70ac7623fe3d90.zip |
convert pipeline workers from pykafka to confluent-kafka
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 176 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 83 | ||||
-rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 77 |
3 files changed, 227 insertions, 109 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 6319d55a..4108012e 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,7 @@ import json import time -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException from .worker_common import FatcatWorker, most_recent_message @@ -12,7 +12,7 @@ class ChangelogWorker(FatcatWorker): found, fetch them and push (as JSON) into a Kafka topic. """ - def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): + def __init__(self, api, kafka_hosts, produce_topic, poll_interval=5.0, offset=None): # TODO: should be offset=0 super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, @@ -21,38 +21,47 @@ class ChangelogWorker(FatcatWorker): self.offset = offset # the fatcat changelog offset, not the kafka offset def run(self): - topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using # that as the starting offset. Note that this is a single-partition # topic if self.offset is None: print("Checking for most recent changelog offset...") - msg = most_recent_message(topic) + msg = most_recent_message(self.produce_topic, self.kafka_config) if msg: self.offset = json.loads(msg.decode('utf-8'))['index'] else: - self.offset = 1 - - with topic.get_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - while True: - latest = int(self.api.get_changelog(limit=1)[0].index) - if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): - cle = self.api.get_changelog_entry(i) - obj = self.api.api_client.sanitize_for_serialization(cle) - producer.produce( - message=json.dumps(obj).encode('utf-8'), - partition_key=None, - timestamp=None, - #NOTE could be (???): timestamp=cle.timestamp, - ) - self.offset = i - print("Sleeping {} seconds...".format(self.poll_interval)) - time.sleep(self.poll_interval) + self.offset = 0 + print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + producer = Producer(self.kafka_config) + + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + self.produce_topic, + json.dumps(obj).encode('utf-8'), + key=str(i), + on_delivery=fail_fast, + #NOTE timestamp could be timestamp=cle.timestamp (?) + ) + self.offset = i + producer.poll(0) + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) class EntityUpdatesWorker(FatcatWorker): @@ -63,45 +72,94 @@ class EntityUpdatesWorker(FatcatWorker): For now, only release updates are published. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic): + def __init__(self, api, kafka_hosts, consume_topic, release_topic, poll_interval=5.0): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic + self.poll_interval = poll_interval self.consumer_group = "entity-updates" def run(self): - changelog_topic = self.kafka.topics[self.consume_topic] - release_topic = self.kafka.topics[self.release_topic] - - consumer = changelog_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=False, - fetch_message_max_bytes=4000000, # up to ~4MBytes - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, + + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + def on_commit(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + print(p) + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + print("Kafka consumer commit successful") + pass + + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer_conf = self.kafka_config.copy() + consumer_conf.update({ + 'group.id': self.consumer_group, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + }) + consumer = Consumer(consumer_conf) + + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'default.topic.config': { + 'request.required.acks': -1, + }, + }) + producer = Producer(producer_conf) + + consumer.subscribe([self.consume_topic], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) + print("Kafka consuming {}".format(self.consume_topic)) + + while True: + msg = consumer.poll(self.poll_interval) + if not msg: + print("nothing new from kafka (interval:{})".format(self.poll_interval)) + consumer.commit() + continue + if msg.error(): + raise KafkaException(msg.error()) + + cle = json.loads(msg.value().decode('utf-8')) + #print(cle) + print("processing changelog index {}".format(cle['index'])) + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + ident = re['ident'] + release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") + # TODO: use .to_json() helper + release_dict = self.api.api_client.sanitize_for_serialization(release) + producer.produce( + self.release_topic, + json.dumps(release_dict).encode('utf-8'), + key=ident.encode('utf-8'), + on_delivery=fail_fast, + ) + consumer.store_offsets(msg) - # using a sync producer to try and avoid racey loss of delivery (aka, - # if consumer group updated but produce didn't stick) - with release_topic.get_sync_producer( - max_request_size=self.produce_max_request_size, - ) as producer: - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) - #print(cle) - print("processing changelog index {}".format(cle['index'])) - release_edits = cle['editgroup']['edits']['releases'] - for re in release_edits: - ident = re['ident'] - release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") - release_dict = self.api.api_client.sanitize_for_serialization(release) - producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - #consumer.commit_offsets() diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 83310284..bb7f0cfb 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,7 +2,7 @@ import json import time import requests -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException from fatcat_client import ReleaseEntity, ApiClient from fatcat_tools import * @@ -17,36 +17,77 @@ class ElasticsearchReleaseWorker(FatcatWorker): Uses a consumer group to manage offset. """ - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): + def __init__(self, kafka_hosts, consume_topic, poll_interval=5.0, offset=None, + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", + batch_size=200): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates" + self.batch_size = batch_size + self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index def run(self): - consume_topic = self.kafka.topics[self.consume_topic] ac = ApiClient() - consumer = consume_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - fetch_message_max_bytes=4000000, # up to ~4MBytes - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer_conf = self.kafka_config.copy() + consumer_conf.update({ + 'group.id': self.consumer_group, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + }) + consumer = Consumer(consumer_conf) + consumer.subscribe([self.consume_topic], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) - for msg in consumer: - json_str = msg.value.decode('utf-8') - release = entity_from_json(json_str, ReleaseEntity, api_client=ac) - #print(release) - elasticsearch_endpoint = "{}/{}/release/{}".format( + while True: + batch = consumer.consume( + num_messages=self.batch_size, + timeout=self.poll_interval) + if not batch: + if not consumer.assignment(): + print("... no Kafka consumer partitions assigned yet") + print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval)) + continue + print("... got {} kafka messages".format(len(batch))) + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + bulk_actions = [] + for msg in batch: + json_str = msg.value().decode('utf-8') + entity = entity_from_json(json_str, ReleaseEntity, api_client=ac) + print("Upserting: release/{}".format(entity.ident)) + bulk_actions.append(json.dumps({ + "index": { "_id": entity.ident, }, + })) + bulk_actions.append(json.dumps( + release_to_elasticsearch(entity))) + elasticsearch_endpoint = "{}/{}/release/_bulk".format( self.elasticsearch_backend, - self.elasticsearch_index, - release.ident) - print("Updating document: {}".format(elasticsearch_endpoint)) - resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release)) + self.elasticsearch_index) + resp = requests.post(elasticsearch_endpoint, + headers={"Content-Type": "application/x-ndjson"}, + data="\n".join(bulk_actions) + "\n") resp.raise_for_status() - #consumer.commit_offsets() + if resp.json()['errors']: + desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) + print(desc) + print(resp.content) + raise Exception(desc) + consumer.store_offsets(batch[-1]) + diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index cb4e5dab..1d465f58 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -5,41 +5,56 @@ import csv import json import itertools from itertools import islice -from pykafka import KafkaClient -from pykafka.common import OffsetType +from confluent_kafka import Consumer, KafkaException, TopicPartition import fatcat_client from fatcat_client.rest import ApiException -def most_recent_message(topic): +def most_recent_message(topic, kafka_config): """ Tries to fetch the most recent message from a given topic. - This only makes sense for single partition topics, though could be - extended with "last N" behavior. - Following "Consuming the last N messages from a topic" - from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + This only makes sense for single partition topics (it works with only the + first partition), though could be extended with "last N" behavior. """ - consumer = topic.get_simple_consumer( - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=True) - offsets = [(p, op.last_offset_consumed - 1) - for p, op in consumer._partitions.items()] - offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] - if -2 in [o for p, o in offsets]: - consumer.stop() + + print("Fetching most Kafka message from {}".format(topic)) + + conf = kafka_config.copy() + conf.update({ + 'group.id': 'worker-init-last-msg', # should never commit + 'delivery.report.only.error': True, + 'enable.auto.commit': False, + 'default.topic.config': { + 'request.required.acks': -1, + 'auto.offset.reset': 'latest', + }, + }) + + consumer = Consumer(conf) + + hwm = consumer.get_watermark_offsets( + TopicPartition(topic, 0), + timeout=5.0, + cached=False) + if not hwm: + raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) + print("High watermarks: {}".format(hwm)) + + if hwm[1] == 0: + print("topic is new; not 'most recent message'") return None - else: - consumer.reset_offsets(offsets) - msg = islice(consumer, 1) - if msg: - val = list(msg)[0].value - consumer.stop() - return val - else: - consumer.stop() - return None + + consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) + msg = consumer.poll(2.0) + consumer.close() + if not msg: + raise Exception("Failed to fetch most recent kafka message") + if msg.error(): + raise KafkaException(msg.error()) + return msg.value() + class FatcatWorker: """ @@ -49,9 +64,13 @@ class FatcatWorker: def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None): if api: self.api = api - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + 'default.topic.config': { + 'request.required.acks': 'all', + }, + } self.produce_topic = produce_topic self.consume_topic = consume_topic - - # Kafka producer batch size tuning; also limit on size of single document - self.produce_max_request_size = 10000000 # 10 MByte-ish |