From d3901d21037ee740c0273cadf67d9783b420b029 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 3 Dec 2018 18:15:29 -0800 Subject: tweak kafka config significantly --- python/kafka_grobid.py | 19 ++++++++++++++++--- python/kafka_grobid_hbase.py | 2 ++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index f3aaedf..fc4e461 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -69,6 +69,7 @@ class KafkaGrobidWorker: self.warc_uri_prefix = kwargs.get('warc_uri_prefix') self.mime_filter = ['application/pdf'] self.rstore = None + self.produce_max_request_size = 20000000 # Kafka producer batch size tuning; also limit on size of single extracted document def grobid_process_fulltext(self, content): r = requests.post(self.grobid_uri + "/api/processFulltextDocument", @@ -243,14 +244,26 @@ class KafkaGrobidWorker: consume_topic = kafka.topics[self.consume_topic] sequential_failures = 0 - with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer: + # Configure producer to basically *immediately* publish messages, + # one-at-a-time, but asynchronously (don't block). Fetch and GROBID + # process takes a while, and we want to send as soon as processing is + # done. + with produce_topic.get_producer(sync=False, + compression=pykafka.common.CompressionType.GZIP, + retry_backoff_ms=250, + max_queued_messages=20, + min_queued_messages=1, + linger_ms=0, + max_request_size=self.produce_max_request_size) as producer: print("Producing to: {}".format(self.produce_topic)) consumer = consume_topic.get_balanced_consumer( consumer_group=self.consumer_group, managed=True, - #fetch_message_max_bytes=100000, # only ~100kbytes at a time auto_commit_enable=True, - auto_commit_interval_ms=60000, # 60 seconds + auto_commit_interval_ms=30000, # 30 seconds + # LATEST because best to miss processing than waste time re-process + auto_offset_reset=pykafka.common.OffsetType.LATEST, + queued_max_messages=20, compacted_topic=True) print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group)) sys.stdout.flush() diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py index 7c5c4a2..466ccb6 100755 --- a/python/kafka_grobid_hbase.py +++ b/python/kafka_grobid_hbase.py @@ -145,6 +145,8 @@ class KafkaGrobidHbaseWorker: consumer_group=self.consumer_group, managed=True, auto_commit_enable=True, + # LATEST because best to miss processing than waste time re-process + auto_offset_reset=pykafka.common.OffsetType.LATEST, compacted_topic=True) print("Kafka consuming {} in group {}".format( self.consume_topic, -- cgit v1.2.3