diff options
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 183 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 95 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 77 | 
3 files changed, 230 insertions, 125 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 789a97a4..4a54c649 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,7 @@  import json  import time -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException  from .worker_common import FatcatWorker, most_recent_message @@ -21,38 +21,47 @@ class ChangelogWorker(FatcatWorker):          self.offset = offset    # the fatcat changelog offset, not the kafka offset      def run(self): -        topic = self.kafka.topics[self.produce_topic] +          # On start, try to consume the most recent from the topic, and using          # that as the starting offset. Note that this is a single-partition          # topic          if self.offset is None:              print("Checking for most recent changelog offset...") -            msg = most_recent_message(topic) +            msg = most_recent_message(self.produce_topic, self.kafka_config)              if msg:                  self.offset = json.loads(msg.decode('utf-8'))['index']              else: -                self.offset = 1 - -        with topic.get_producer( -                max_request_size=self.produce_max_request_size, -                ) as producer: -            while True: -                latest = int(self.api.get_changelog(limit=1)[0].index) -                if latest > self.offset: -                    print("Fetching changelogs from {} through {}".format( -                        self.offset+1, latest)) -                for i in range(self.offset+1, latest+1): -                    cle = self.api.get_changelog_entry(i) -                    obj = self.api.api_client.sanitize_for_serialization(cle) -                    producer.produce( -                        message=json.dumps(obj).encode('utf-8'), -                        partition_key=None, -                        timestamp=None, -                        #NOTE could be (???): timestamp=cle.timestamp, -                    ) -                    self.offset = i -                print("Sleeping {} seconds...".format(self.poll_interval)) -                time.sleep(self.poll_interval) +                self.offset = 0 +            print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) + +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        producer = Producer(self.kafka_config) + +        while True: +            latest = int(self.api.get_changelog(limit=1)[0].index) +            if latest > self.offset: +                print("Fetching changelogs from {} through {}".format( +                    self.offset+1, latest)) +            for i in range(self.offset+1, latest+1): +                cle = self.api.get_changelog_entry(i) +                obj = self.api.api_client.sanitize_for_serialization(cle) +                producer.produce( +                    self.produce_topic, +                    json.dumps(obj).encode('utf-8'), +                    key=str(i), +                    on_delivery=fail_fast, +                    #NOTE timestamp could be timestamp=cle.timestamp (?) +                ) +                self.offset = i +            producer.poll(0) +            print("Sleeping {} seconds...".format(self.poll_interval)) +            time.sleep(self.poll_interval)  class EntityUpdatesWorker(FatcatWorker): @@ -63,46 +72,83 @@ class EntityUpdatesWorker(FatcatWorker):      For now, only release updates are published.      """ -    def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic): +    def __init__(self, api, kafka_hosts, consume_topic, release_topic, file_topic, container_topic, poll_interval=5.0):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic,                           api=api)          self.release_topic = release_topic          self.file_topic = file_topic          self.container_topic = container_topic +        self.poll_interval = poll_interval          self.consumer_group = "entity-updates"      def run(self): -        changelog_topic = self.kafka.topics[self.consume_topic] -        release_topic = self.kafka.topics[self.release_topic] -        file_topic = self.kafka.topics[self.file_topic] -        container_topic = self.kafka.topics[self.container_topic] - -        consumer = changelog_topic.get_balanced_consumer( -            consumer_group=self.consumer_group, -            managed=True, -            auto_offset_reset=OffsetType.LATEST, -            reset_offset_on_start=False, -            fetch_message_max_bytes=10000000, # up to ~10 MBytes -            auto_commit_enable=True, -            auto_commit_interval_ms=30000, # 30 seconds -            compacted_topic=True, -        ) -        # using a sync producer to try and avoid racey loss of delivery (aka, -        # if consumer group updated but produce didn't stick) -        release_producer = release_topic.get_sync_producer( -            max_request_size=self.produce_max_request_size, +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        def on_commit(err, partitions): +            if err is not None: +                print("Kafka consumer commit error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) +            for p in partitions: +                # check for partition-specific commit errors +                print(p) +                if p.error: +                    print("Kafka consumer commit error: {}".format(p.error)) +                    print("Bailing out...") +                    # TODO: should it be sys.exit(-1)? +                    raise KafkaException(err) +            print("Kafka consumer commit successful") +            pass + +        def on_rebalance(consumer, partitions): +            for p in partitions: +                if p.error: +                    raise KafkaException(p.error) +            print("Kafka partitions rebalanced: {} / {}".format( +                consumer, partitions)) + +        consumer_conf = self.kafka_config.copy() +        consumer_conf.update({ +            'group.id': self.consumer_group, +            'enable.auto.offset.store': False, +            'default.topic.config': { +                'auto.offset.reset': 'latest', +            }, +        }) +        consumer = Consumer(consumer_conf) + +        producer_conf = self.kafka_config.copy() +        producer_conf.update({ +            'default.topic.config': { +                'request.required.acks': -1, +            }, +        }) +        producer = Producer(producer_conf) + +        consumer.subscribe([self.consume_topic], +            on_assign=on_rebalance, +            on_revoke=on_rebalance,          ) -        file_producer = file_topic.get_sync_producer( -            max_request_size=self.produce_max_request_size, -        ) -        container_producer = container_topic.get_sync_producer( -            max_request_size=self.produce_max_request_size, -        ) - -        for msg in consumer: -            cle = json.loads(msg.value.decode('utf-8')) +        print("Kafka consuming {}".format(self.consume_topic)) + +        while True: +            msg = consumer.poll(self.poll_interval) +            if not msg: +                print("nothing new from kafka (interval:{})".format(self.poll_interval)) +                consumer.commit() +                continue +            if msg.error(): +                raise KafkaException(msg.error()) + +            cle = json.loads(msg.value().decode('utf-8'))              #print(cle)              print("processing changelog index {}".format(cle['index']))              release_ids = [] @@ -130,28 +176,31 @@ class EntityUpdatesWorker(FatcatWorker):                  # releases for which list changed                  release_ids.extend(file_entity.release_ids or [])                  file_dict = self.api.api_client.sanitize_for_serialization(file_entity) -                file_producer.produce( -                    message=json.dumps(file_dict).encode('utf-8'), -                    partition_key=ident.encode('utf-8'), -                    timestamp=None, +                producer.produce( +                    self.file_topic, +                    json.dumps(file_dict).encode('utf-8'), +                    key=ident.encode('utf-8'), +                    on_delivery=fail_fast,                  )              for ident in set(container_ids):                  container = self.api.get_container(ident)                  container_dict = self.api.api_client.sanitize_for_serialization(container) -                container_producer.produce( -                    message=json.dumps(container_dict).encode('utf-8'), -                    partition_key=ident.encode('utf-8'), -                    timestamp=None, +                producer.produce( +                    self.container_topic, +                    json.dumps(container_dict).encode('utf-8'), +                    key=ident.encode('utf-8'), +                    on_delivery=fail_fast,                  )              for ident in set(release_ids):                  release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")                  work_ids.append(release.work_id)                  release_dict = self.api.api_client.sanitize_for_serialization(release) -                release_producer.produce( -                    message=json.dumps(release_dict).encode('utf-8'), -                    partition_key=ident.encode('utf-8'), -                    timestamp=None, +                producer.produce( +                    self.release_topic, +                    json.dumps(release_dict).encode('utf-8'), +                    key=ident.encode('utf-8'), +                    on_delivery=fail_fast,                  )              # TODO: actually update works +            consumer.store_offsets(msg) -            #consumer.commit_offsets() diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 91224d98..547e270c 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,7 +2,7 @@  import json  import time  import requests -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException  from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient  from fatcat_tools import * @@ -18,10 +18,13 @@ class ElasticsearchReleaseWorker(FatcatWorker):      """      def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, -            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): +            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", +            batch_size=200):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic)          self.consumer_group = "elasticsearch-updates" +        self.batch_size = batch_size +        self.poll_interval = poll_interval          self.elasticsearch_backend = elasticsearch_backend          self.elasticsearch_index = elasticsearch_index          self.entity_type = ReleaseEntity @@ -29,52 +32,86 @@ class ElasticsearchReleaseWorker(FatcatWorker):          self.transform_func = release_to_elasticsearch      def run(self): -        consume_topic = self.kafka.topics[self.consume_topic]          ac = ApiClient() -        consumer = consume_topic.get_balanced_consumer( -            consumer_group=self.consumer_group, -            managed=True, -            fetch_message_max_bytes=10000000, # up to ~10 MBytes -            auto_commit_enable=True, -            auto_commit_interval_ms=30000, # 30 seconds -            compacted_topic=True, +        def on_rebalance(consumer, partitions): +            for p in partitions: +                if p.error: +                    raise KafkaException(p.error) +            print("Kafka partitions rebalanced: {} / {}".format( +                consumer, partitions)) + +        consumer_conf = self.kafka_config.copy() +        consumer_conf.update({ +            'group.id': self.consumer_group, +            'enable.auto.offset.store': False, +            'default.topic.config': { +                'auto.offset.reset': 'latest', +            }, +        }) +        consumer = Consumer(consumer_conf) +        consumer.subscribe([self.consume_topic], +            on_assign=on_rebalance, +            on_revoke=on_rebalance,          ) -        for msg in consumer: -            json_str = msg.value.decode('utf-8') -            # HACK: work around a bug where container entities got published to -            # release_v03 topic -            if self.elasticsearch_document_name == "release": -                entity_dict = json.loads(json_str) -                if entity_dict.get('name') and not entity_dict.get('title'): -                    continue -            entity = entity_from_json(json_str, self.entity_type, api_client=ac) -            #print(entity) -            elasticsearch_endpoint = "{}/{}/{}/{}".format( +        while True: +            batch = consumer.consume( +                num_messages=self.batch_size, +                timeout=self.poll_interval) +            if not batch: +                if not consumer.assignment(): +                    print("... no Kafka consumer partitions assigned yet") +                print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval)) +                continue +            print("... got {} kafka messages".format(len(batch))) +            # first check errors on entire batch... +            for msg in batch: +                if msg.error(): +                    raise KafkaException(msg.error()) +            # ... then process +            bulk_actions = [] +            for msg in batch: +                json_str = msg.value().decode('utf-8') +                entity = entity_from_json(json_str, ReleaseEntity, api_client=ac) +                print("Upserting: release/{}".format(entity.ident)) +                bulk_actions.append(json.dumps({ +                    "index": { "_id": entity.ident, }, +                })) +                bulk_actions.append(json.dumps( +                    release_to_elasticsearch(entity))) +            elasticsearch_endpoint = "{}/{}/release/_bulk".format(                  self.elasticsearch_backend, -                self.elasticsearch_index, -                self.elasticsearch_document_name, -                entity.ident) -            print("Updating document: {}".format(elasticsearch_endpoint)) -            resp = requests.post(elasticsearch_endpoint, json=self.transform_func(entity)) +                self.elasticsearch_index) +            resp = requests.post(elasticsearch_endpoint, +                headers={"Content-Type": "application/x-ndjson"}, +                data="\n".join(bulk_actions) + "\n")              resp.raise_for_status() -            #consumer.commit_offsets() +            if resp.json()['errors']: +                desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) +                print(desc) +                print(resp.content) +                raise Exception(desc) +            consumer.store_offsets(batch[-1]) +  class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):      def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, -            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): +            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", +            batch_size=200):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic,                           poll_interval=poll_interval,                           offset=offset,                           elasticsearch_backend=elasticsearch_backend, -                         elasticsearch_index=elasticsearch_index) +                         elasticsearch_index=elasticsearch_index, +                         batch_size=batch_size)          # previous group got corrupted (by pykafka library?)          self.consumer_group = "elasticsearch-updates2"          self.entity_type = ContainerEntity          self.elasticsearch_document_name = "container"          self.transform_func = container_to_elasticsearch + diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 9ffbe5fd..fb8cfc19 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -5,41 +5,56 @@ import csv  import json  import itertools  from itertools import islice -from pykafka import KafkaClient -from pykafka.common import OffsetType +from confluent_kafka import Consumer, KafkaException, TopicPartition  import fatcat_openapi_client  from fatcat_openapi_client.rest import ApiException -def most_recent_message(topic): +def most_recent_message(topic, kafka_config):      """      Tries to fetch the most recent message from a given topic. -    This only makes sense for single partition topics, though could be -    extended with "last N" behavior. -    Following "Consuming the last N messages from a topic" -    from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns +    This only makes sense for single partition topics (it works with only the +    first partition), though could be extended with "last N" behavior.      """ -    consumer = topic.get_simple_consumer( -        auto_offset_reset=OffsetType.LATEST, -        reset_offset_on_start=True) -    offsets = [(p, op.last_offset_consumed - 1) -                for p, op in consumer._partitions.items()] -    offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] -    if -2 in [o for p, o in offsets]: -        consumer.stop() + +    print("Fetching most Kafka message from {}".format(topic)) + +    conf = kafka_config.copy() +    conf.update({ +        'group.id': 'worker-init-last-msg', # should never commit +        'delivery.report.only.error': True, +        'enable.auto.commit': False, +        'default.topic.config': { +            'request.required.acks': -1, +            'auto.offset.reset': 'latest', +        }, +    }) + +    consumer = Consumer(conf) + +    hwm = consumer.get_watermark_offsets( +        TopicPartition(topic, 0), +        timeout=5.0, +        cached=False) +    if not hwm: +        raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) +    print("High watermarks: {}".format(hwm)) + +    if hwm[1] == 0: +        print("topic is new; not 'most recent message'")          return None -    else: -        consumer.reset_offsets(offsets) -        msg = islice(consumer, 1) -        if msg: -            val = list(msg)[0].value -            consumer.stop() -            return val -        else: -            consumer.stop() -            return None + +    consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) +    msg = consumer.poll(2.0) +    consumer.close() +    if not msg: +        raise Exception("Failed to fetch most recent kafka message") +    if msg.error(): +        raise KafkaException(msg.error()) +    return msg.value() +  class FatcatWorker:      """ @@ -49,9 +64,13 @@ class FatcatWorker:      def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):          if api:              self.api = api -        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'delivery.report.only.error': True, +            'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes +            'default.topic.config': { +                'request.required.acks': 'all', +            }, +        }          self.produce_topic = produce_topic          self.consume_topic = consume_topic - -        # Kafka producer batch size tuning; also limit on size of single document -        self.produce_max_request_size = 10000000  # 10 MByte-ish  | 
