From 8774b17dbb9c0be8ca44846188f77403fae3e867 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 14 Nov 2019 00:42:27 -0800 Subject: update ingest-file batch size to 1 Was defaulting to 100, which I think was resulting in lots of consumer group timeouts, resulting in UNKNOWN_MEMBER_ID errors. Will probably switch back to batches of 10 or so, but multi-processing or some other concurrent dispatch/processing. --- python/sandcrawler/workers.py | 6 +++--- python/sandcrawler_worker.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index c1203e1..def890e 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -401,13 +401,13 @@ def make_kafka_consumer(hosts, consume_topic, group): 'bootstrap.servers': hosts, 'group.id': group, 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker + # messages don't have offset marked as stored until processed, + # but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, 'enable.auto.commit': True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 120000, + 'max.poll.interval.ms': 300000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 1000228..81aef5b 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -43,7 +43,7 @@ def run_ingest_file(args): grobid_client = GrobidClient(host_url=args.grobid_host) worker = IngestFileWorker(grobid_client=grobid_client, sink=sink) pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-file") + consume_topic=consume_topic, group="ingest-file", batch_size=1) pusher.run() def main(): -- cgit v1.2.3