diff options
-rwxr-xr-x | python/fatcat_import.py | 5 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 91 |
2 files changed, 75 insertions, 21 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index aea8c757..c612d605 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -12,7 +12,8 @@ def run_crossref(args): edit_batch_size=args.batch_size, bezerk_mode=args.bezerk_mode) if args.kafka_mode: - KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", "fatcat-import").run() + KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", + "fatcat-import", edit_batch_size=args.batch_size).run() else: JsonLinePusher(fci, args.json_file).run() @@ -93,7 +94,7 @@ def main(): default="localhost:9092", help="list of Kafka brokers (host/port) to use") parser.add_argument('--kafka-env', - default="qa", + default="dev", help="Kafka topic namespace to use (eg, prod, qa)") parser.add_argument('--batch-size', help="size of batch to send", diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index b89c3828..4648f6f3 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -7,7 +7,7 @@ import ftfy import itertools import subprocess from collections import Counter -import pykafka +from confluent_kafka import Consumer, KafkaException import fatcat_client from fatcat_client.rest import ApiException @@ -429,35 +429,88 @@ class KafkaJsonPusher(RecordPusher): topic_suffix, group, ) + self.edit_batch_size = kwargs.get('edit_batch_size', 100) def run(self): count = 0 - for msg in self.consumer: - if not msg: + while True: + batch = self.consumer.consume( + num_messages=self.edit_batch_size, + timeout=3.0) + print("... got {} kafka messages".format(len(batch))) + if not batch: + # TODO: could have some larger timeout here and + # self.importer.finish() if it's been more than, eg, a couple + # minutes continue - record = json.loads(msg.value.decode('utf-8')) - self.importer.push_record(record) - count += 1 - if count % 500 == 0: - print("Import counts: {}".format(self.importer.counts)) + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + for msg in batch: + record = json.loads(msg.value().decode('utf-8')) + self.importer.push_record(record) + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + # locally store the last processed message; will be auto-commited + # from this "stored" value + assert msg + self.consumer.store_offsets(msg) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() print(counts) + self.consumer.close() return counts def make_kafka_consumer(hosts, env, topic_suffix, group): - topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8') - client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0") - consume_topic = client.topics[topic_name] - print("Consuming from kafka topic {}, group {}".format(topic_name, group)) - - consumer = consume_topic.get_balanced_consumer( - consumer_group=group.encode('utf-8'), - managed=True, - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - compacted_topic=True, + topic_name = "fatcat-{}.{}".format(env, topic_suffix) + + def fail_fast(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + print(p) + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + #print("Kafka consumer commit successful") + pass + + # previously, using pykafka + #auto_commit_enable=True, + #auto_commit_interval_ms=30000, # 30 seconds + conf = { + 'bootstrap.servers': hosts, + 'group.id': group.encode('utf-8'), + 'on_commit': fail_fast, + 'delivery.report.only.error': True, + 'enable.auto.offset.store': False, + 'default.topic.config': { + 'request.required.acks': -1, + 'auto.offset.reset': 'latest', + }, + } + + def on_rebalance(consumer, partitions): + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions)) + + consumer = Consumer(conf) + # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 + # encoded) + consumer.subscribe([topic_name], + on_assign=on_rebalance, + on_revoke=on_rebalance, ) + print("Consuming from kafka topic {}, group {}".format(topic_name, group)) return consumer |