diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 176 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 83 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 77 | 
3 files changed, 227 insertions, 109 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 6319d55a..4108012e 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,7 +1,7 @@  import json  import time -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException  from .worker_common import FatcatWorker, most_recent_message @@ -12,7 +12,7 @@ class ChangelogWorker(FatcatWorker):      found, fetch them and push (as JSON) into a Kafka topic.      """ -    def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): +    def __init__(self, api, kafka_hosts, produce_topic, poll_interval=5.0, offset=None):          # TODO: should be offset=0          super().__init__(kafka_hosts=kafka_hosts,                           produce_topic=produce_topic, @@ -21,38 +21,47 @@ class ChangelogWorker(FatcatWorker):          self.offset = offset    # the fatcat changelog offset, not the kafka offset      def run(self): -        topic = self.kafka.topics[self.produce_topic] +          # On start, try to consume the most recent from the topic, and using          # that as the starting offset. Note that this is a single-partition          # topic          if self.offset is None:              print("Checking for most recent changelog offset...") -            msg = most_recent_message(topic) +            msg = most_recent_message(self.produce_topic, self.kafka_config)              if msg:                  self.offset = json.loads(msg.decode('utf-8'))['index']              else: -                self.offset = 1 - -        with topic.get_producer( -                max_request_size=self.produce_max_request_size, -                ) as producer: -            while True: -                latest = int(self.api.get_changelog(limit=1)[0].index) -                if latest > self.offset: -                    print("Fetching changelogs from {} through {}".format( -                        self.offset+1, latest)) -                for i in range(self.offset+1, latest+1): -                    cle = self.api.get_changelog_entry(i) -                    obj = self.api.api_client.sanitize_for_serialization(cle) -                    producer.produce( -                        message=json.dumps(obj).encode('utf-8'), -                        partition_key=None, -                        timestamp=None, -                        #NOTE could be (???): timestamp=cle.timestamp, -                    ) -                    self.offset = i -                print("Sleeping {} seconds...".format(self.poll_interval)) -                time.sleep(self.poll_interval) +                self.offset = 0 +            print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) + +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        producer = Producer(self.kafka_config) + +        while True: +            latest = int(self.api.get_changelog(limit=1)[0].index) +            if latest > self.offset: +                print("Fetching changelogs from {} through {}".format( +                    self.offset+1, latest)) +            for i in range(self.offset+1, latest+1): +                cle = self.api.get_changelog_entry(i) +                obj = self.api.api_client.sanitize_for_serialization(cle) +                producer.produce( +                    self.produce_topic, +                    json.dumps(obj).encode('utf-8'), +                    key=str(i), +                    on_delivery=fail_fast, +                    #NOTE timestamp could be timestamp=cle.timestamp (?) +                ) +                self.offset = i +            producer.poll(0) +            print("Sleeping {} seconds...".format(self.poll_interval)) +            time.sleep(self.poll_interval)  class EntityUpdatesWorker(FatcatWorker): @@ -63,45 +72,94 @@ class EntityUpdatesWorker(FatcatWorker):      For now, only release updates are published.      """ -    def __init__(self, api, kafka_hosts, consume_topic, release_topic): +    def __init__(self, api, kafka_hosts, consume_topic, release_topic, poll_interval=5.0):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic,                           api=api)          self.release_topic = release_topic +        self.poll_interval = poll_interval          self.consumer_group = "entity-updates"      def run(self): -        changelog_topic = self.kafka.topics[self.consume_topic] -        release_topic = self.kafka.topics[self.release_topic] - -        consumer = changelog_topic.get_balanced_consumer( -            consumer_group=self.consumer_group, -            managed=True, -            auto_offset_reset=OffsetType.LATEST, -            reset_offset_on_start=False, -            fetch_message_max_bytes=4000000, # up to ~4MBytes -            auto_commit_enable=True, -            auto_commit_interval_ms=30000, # 30 seconds -            compacted_topic=True, + +        def fail_fast(err, msg): +            if err is not None: +                print("Kafka producer delivery error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) + +        def on_commit(err, partitions): +            if err is not None: +                print("Kafka consumer commit error: {}".format(err)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) +            for p in partitions: +                # check for partition-specific commit errors +                print(p) +                if p.error: +                    print("Kafka consumer commit error: {}".format(p.error)) +                    print("Bailing out...") +                    # TODO: should it be sys.exit(-1)? +                    raise KafkaException(err) +            print("Kafka consumer commit successful") +            pass + +        def on_rebalance(consumer, partitions): +            for p in partitions: +                if p.error: +                    raise KafkaException(p.error) +            print("Kafka partitions rebalanced: {} / {}".format( +                consumer, partitions)) + +        consumer_conf = self.kafka_config.copy() +        consumer_conf.update({ +            'group.id': self.consumer_group, +            'enable.auto.offset.store': False, +            'default.topic.config': { +                'auto.offset.reset': 'latest', +            }, +        }) +        consumer = Consumer(consumer_conf) + +        producer_conf = self.kafka_config.copy() +        producer_conf.update({ +            'default.topic.config': { +                'request.required.acks': -1, +            }, +        }) +        producer = Producer(producer_conf) + +        consumer.subscribe([self.consume_topic], +            on_assign=on_rebalance, +            on_revoke=on_rebalance,          ) +        print("Kafka consuming {}".format(self.consume_topic)) + +        while True: +            msg = consumer.poll(self.poll_interval) +            if not msg: +                print("nothing new from kafka (interval:{})".format(self.poll_interval)) +                consumer.commit() +                continue +            if msg.error(): +                raise KafkaException(msg.error()) + +            cle = json.loads(msg.value().decode('utf-8')) +            #print(cle) +            print("processing changelog index {}".format(cle['index'])) +            release_edits = cle['editgroup']['edits']['releases'] +            for re in release_edits: +                ident = re['ident'] +                release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") +                # TODO: use .to_json() helper +                release_dict = self.api.api_client.sanitize_for_serialization(release) +                producer.produce( +                    self.release_topic, +                    json.dumps(release_dict).encode('utf-8'), +                    key=ident.encode('utf-8'), +                    on_delivery=fail_fast, +                ) +            consumer.store_offsets(msg) -        # using a sync producer to try and avoid racey loss of delivery (aka, -        # if consumer group updated but produce didn't stick) -        with release_topic.get_sync_producer( -                max_request_size=self.produce_max_request_size, -                ) as producer: -            for msg in consumer: -                cle = json.loads(msg.value.decode('utf-8')) -                #print(cle) -                print("processing changelog index {}".format(cle['index'])) -                release_edits = cle['editgroup']['edits']['releases'] -                for re in release_edits: -                    ident = re['ident'] -                    release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") -                    release_dict = self.api.api_client.sanitize_for_serialization(release) -                    producer.produce( -                        message=json.dumps(release_dict).encode('utf-8'), -                        partition_key=ident.encode('utf-8'), -                        timestamp=None, -                    ) -                #consumer.commit_offsets() diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 83310284..bb7f0cfb 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,7 +2,7 @@  import json  import time  import requests -from pykafka.common import OffsetType +from confluent_kafka import Consumer, Producer, KafkaException  from fatcat_client import ReleaseEntity, ApiClient  from fatcat_tools import * @@ -17,36 +17,77 @@ class ElasticsearchReleaseWorker(FatcatWorker):      Uses a consumer group to manage offset.      """ -    def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, -            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): +    def __init__(self, kafka_hosts, consume_topic, poll_interval=5.0, offset=None, +            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", +            batch_size=200):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic)          self.consumer_group = "elasticsearch-updates" +        self.batch_size = batch_size +        self.poll_interval = poll_interval          self.elasticsearch_backend = elasticsearch_backend          self.elasticsearch_index = elasticsearch_index      def run(self): -        consume_topic = self.kafka.topics[self.consume_topic]          ac = ApiClient() -        consumer = consume_topic.get_balanced_consumer( -            consumer_group=self.consumer_group, -            managed=True, -            fetch_message_max_bytes=4000000, # up to ~4MBytes -            auto_commit_enable=True, -            auto_commit_interval_ms=30000, # 30 seconds -            compacted_topic=True, +        def on_rebalance(consumer, partitions): +            for p in partitions: +                if p.error: +                    raise KafkaException(p.error) +            print("Kafka partitions rebalanced: {} / {}".format( +                consumer, partitions)) + +        consumer_conf = self.kafka_config.copy() +        consumer_conf.update({ +            'group.id': self.consumer_group, +            'enable.auto.offset.store': False, +            'default.topic.config': { +                'auto.offset.reset': 'latest', +            }, +        }) +        consumer = Consumer(consumer_conf) +        consumer.subscribe([self.consume_topic], +            on_assign=on_rebalance, +            on_revoke=on_rebalance,          ) -        for msg in consumer: -            json_str = msg.value.decode('utf-8') -            release = entity_from_json(json_str, ReleaseEntity, api_client=ac) -            #print(release) -            elasticsearch_endpoint = "{}/{}/release/{}".format( +        while True: +            batch = consumer.consume( +                num_messages=self.batch_size, +                timeout=self.poll_interval) +            if not batch: +                if not consumer.assignment(): +                    print("... no Kafka consumer partitions assigned yet") +                print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval)) +                continue +            print("... got {} kafka messages".format(len(batch))) +            # first check errors on entire batch... +            for msg in batch: +                if msg.error(): +                    raise KafkaException(msg.error()) +            # ... then process +            bulk_actions = [] +            for msg in batch: +                json_str = msg.value().decode('utf-8') +                entity = entity_from_json(json_str, ReleaseEntity, api_client=ac) +                print("Upserting: release/{}".format(entity.ident)) +                bulk_actions.append(json.dumps({ +                    "index": { "_id": entity.ident, }, +                })) +                bulk_actions.append(json.dumps( +                    release_to_elasticsearch(entity))) +            elasticsearch_endpoint = "{}/{}/release/_bulk".format(                  self.elasticsearch_backend, -                self.elasticsearch_index, -                release.ident) -            print("Updating document: {}".format(elasticsearch_endpoint)) -            resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release)) +                self.elasticsearch_index) +            resp = requests.post(elasticsearch_endpoint, +                headers={"Content-Type": "application/x-ndjson"}, +                data="\n".join(bulk_actions) + "\n")              resp.raise_for_status() -            #consumer.commit_offsets() +            if resp.json()['errors']: +                desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) +                print(desc) +                print(resp.content) +                raise Exception(desc) +            consumer.store_offsets(batch[-1]) + diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index cb4e5dab..1d465f58 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -5,41 +5,56 @@ import csv  import json  import itertools  from itertools import islice -from pykafka import KafkaClient -from pykafka.common import OffsetType +from confluent_kafka import Consumer, KafkaException, TopicPartition  import fatcat_client  from fatcat_client.rest import ApiException -def most_recent_message(topic): +def most_recent_message(topic, kafka_config):      """      Tries to fetch the most recent message from a given topic. -    This only makes sense for single partition topics, though could be -    extended with "last N" behavior. -    Following "Consuming the last N messages from a topic" -    from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns +    This only makes sense for single partition topics (it works with only the +    first partition), though could be extended with "last N" behavior.      """ -    consumer = topic.get_simple_consumer( -        auto_offset_reset=OffsetType.LATEST, -        reset_offset_on_start=True) -    offsets = [(p, op.last_offset_consumed - 1) -                for p, op in consumer._partitions.items()] -    offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] -    if -2 in [o for p, o in offsets]: -        consumer.stop() + +    print("Fetching most Kafka message from {}".format(topic)) + +    conf = kafka_config.copy() +    conf.update({ +        'group.id': 'worker-init-last-msg', # should never commit +        'delivery.report.only.error': True, +        'enable.auto.commit': False, +        'default.topic.config': { +            'request.required.acks': -1, +            'auto.offset.reset': 'latest', +        }, +    }) + +    consumer = Consumer(conf) + +    hwm = consumer.get_watermark_offsets( +        TopicPartition(topic, 0), +        timeout=5.0, +        cached=False) +    if not hwm: +        raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) +    print("High watermarks: {}".format(hwm)) + +    if hwm[1] == 0: +        print("topic is new; not 'most recent message'")          return None -    else: -        consumer.reset_offsets(offsets) -        msg = islice(consumer, 1) -        if msg: -            val = list(msg)[0].value -            consumer.stop() -            return val -        else: -            consumer.stop() -            return None + +    consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) +    msg = consumer.poll(2.0) +    consumer.close() +    if not msg: +        raise Exception("Failed to fetch most recent kafka message") +    if msg.error(): +        raise KafkaException(msg.error()) +    return msg.value() +  class FatcatWorker:      """ @@ -49,9 +64,13 @@ class FatcatWorker:      def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):          if api:              self.api = api -        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'delivery.report.only.error': True, +            'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes +            'default.topic.config': { +                'request.required.acks': 'all', +            }, +        }          self.produce_topic = produce_topic          self.consume_topic = consume_topic - -        # Kafka producer batch size tuning; also limit on size of single document -        self.produce_max_request_size = 10000000  # 10 MByte-ish  | 
