diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 32bb210a..42fe38aa 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -703,14 +703,23 @@ class KafkaJsonPusher(RecordPusher): topic_suffix, group, ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 100) def run(self): count = 0 while True: + # TODO: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. Not great. batch = self.consumer.consume( - num_messages=self.edit_batch_size, - timeout=3.0) - print("... got {} kafka messages".format(len(batch))) + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) if not batch: # TODO: could have some larger timeout here and # self.importer.finish() if it's been more than, eg, a couple @@ -727,10 +736,11 @@ class KafkaJsonPusher(RecordPusher): count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) - # locally store the last processed message; will be auto-commited - # from this "stored" value - assert msg - self.consumer.store_offsets(msg) + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() @@ -750,7 +760,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): raise KafkaException(err) for p in partitions: # check for partition-specific commit errors - print(p) if p.error: print("Kafka consumer commit error: {}".format(p.error)) print("Bailing out...") @@ -764,12 +773,17 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): #auto_commit_interval_ms=30000, # 30 seconds conf = { 'bootstrap.servers': hosts, - 'group.id': group.encode('utf-8'), + 'group.id': group, 'on_commit': fail_fast, 'delivery.report.only.error': True, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, + 'enable.auto.commit': True, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + 'max.poll.interval.ms': 120000, 'default.topic.config': { - 'request.required.acks': -1, 'auto.offset.reset': 'latest', }, } |