From 91cd930e4295989ed7001f1a216d33c8331571ee Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 3 Dec 2018 18:54:57 -0800 Subject: ah, right, it's more like extract/3sec, not 30sec --- python/kafka_grobid.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index df222d8..17908e5 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -251,9 +251,9 @@ class KafkaGrobidWorker: with produce_topic.get_producer(sync=False, compression=pykafka.common.CompressionType.GZIP, retry_backoff_ms=250, - max_queued_messages=20, - min_queued_messages=3, - linger_ms=2000, + max_queued_messages=50, + min_queued_messages=10, + linger_ms=5000, max_request_size=self.produce_max_request_size) as producer: print("Producing to: {}".format(self.produce_topic)) consumer = consume_topic.get_balanced_consumer( @@ -263,7 +263,7 @@ class KafkaGrobidWorker: auto_commit_interval_ms=30000, # 30 seconds # LATEST because best to miss processing than waste time re-process auto_offset_reset=pykafka.common.OffsetType.LATEST, - queued_max_messages=20, + queued_max_messages=50, compacted_topic=True) print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group)) sys.stdout.flush() -- cgit v1.2.3