diff options
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 90 | 
1 files changed, 71 insertions, 19 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 093569e1..11ea6880 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -8,9 +8,9 @@ import sqlite3  import subprocess  import unicodedata  from collections import Counter +from confluent_kafka import Consumer, KafkaException  import xml.etree.ElementTree as ET -import pykafka  from bs4 import BeautifulSoup  import fatcat_openapi_client @@ -706,32 +706,84 @@ class KafkaJsonPusher(RecordPusher):      def run(self):          count = 0 -        for msg in self.consumer: -            if not msg: +        while True: +            batch = self.consumer.consume( +                num_messages=self.edit_batch_size, +                timeout=3.0) +            print("... got {} kafka messages".format(len(batch))) +            if not batch: +                # TODO: could have some larger timeout here and +                # self.importer.finish() if it's been more than, eg, a couple +                # minutes                  continue -            record = json.loads(msg.value.decode('utf-8')) -            self.importer.push_record(record) -            count += 1 -            if count % 500 == 0: -                print("Import counts: {}".format(self.importer.counts)) +            # first check errors on entire batch... +            for msg in batch: +                if msg.error(): +                    raise KafkaException(msg.error()) +            # ... then process +            for msg in batch: +                record = json.loads(msg.value().decode('utf-8')) +                self.importer.push_record(record) +                count += 1 +                if count % 500 == 0: +                    print("Import counts: {}".format(self.importer.counts)) +            # locally store the last processed message; will be auto-commited +            # from this "stored" value +            assert msg +            self.consumer.store_offsets(msg)          # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or          # commit the current batch if it has been lingering          counts = self.importer.finish()          print(counts) +        self.consumer.close()          return counts  def make_kafka_consumer(hosts, env, topic_suffix, group): -    topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8') -    client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0") -    consume_topic = client.topics[topic_name] -    print("Consuming from kafka topic {}, group {}".format(topic_name, group)) - -    consumer = consume_topic.get_balanced_consumer( -        consumer_group=group.encode('utf-8'), -        managed=True, -        auto_commit_enable=True, -        auto_commit_interval_ms=30000, # 30 seconds -        compacted_topic=True, +    topic_name = "fatcat-{}.{}".format(env, topic_suffix) + +    def fail_fast(err, partitions): +        if err is not None: +            print("Kafka consumer commit error: {}".format(err)) +            print("Bailing out...") +            # TODO: should it be sys.exit(-1)? +            raise KafkaException(err) +        for p in partitions: +            # check for partition-specific commit errors +            print(p) +            if p.error: +                print("Kafka consumer commit error: {}".format(p.error)) +                print("Bailing out...") +                # TODO: should it be sys.exit(-1)? +                raise KafkaException(err) +        #print("Kafka consumer commit successful") +        pass + +    # previously, using pykafka +    #auto_commit_enable=True, +    #auto_commit_interval_ms=30000, # 30 seconds +    conf = { +        'bootstrap.servers': hosts, +        'group.id': group.encode('utf-8'), +        'on_commit': fail_fast, +        'delivery.report.only.error': True, +        'enable.auto.offset.store': False, +        'default.topic.config': { +            'request.required.acks': -1, +            'auto.offset.reset': 'latest', +        }, +    } + +    def on_rebalance(consumer, partitions): +        print("Kafka partitions rebalanced: {} / {}".format( +            consumer, partitions)) + +    consumer = Consumer(conf) +    # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 +    # encoded) +    consumer.subscribe([topic_name], +        on_assign=on_rebalance, +        on_revoke=on_rebalance,      ) +    print("Consuming from kafka topic {}, group {}".format(topic_name, group))      return consumer  | 
