diff options
author | Bryan Newbold <bnewbold@archive.org> | 2019-11-14 00:42:27 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2019-11-14 00:42:29 -0800 |
commit | 8774b17dbb9c0be8ca44846188f77403fae3e867 (patch) | |
tree | 81af1a410b1f7e5c9eed54468adec7f8b9701d42 | |
parent | 18473bd57f9255ba2cd7fe9a75881abf601df7b1 (diff) | |
download | sandcrawler-8774b17dbb9c0be8ca44846188f77403fae3e867.tar.gz sandcrawler-8774b17dbb9c0be8ca44846188f77403fae3e867.zip |
update ingest-file batch size to 1
Was defaulting to 100, which I think was resulting in lots of consumer
group timeouts, resulting in UNKNOWN_MEMBER_ID errors.
Will probably switch back to batches of 10 or so, but multi-processing
or some other concurrent dispatch/processing.
-rw-r--r-- | python/sandcrawler/workers.py | 6 | ||||
-rwxr-xr-x | python/sandcrawler_worker.py | 2 |
2 files changed, 4 insertions, 4 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index c1203e1..def890e 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -401,13 +401,13 @@ def make_kafka_consumer(hosts, consume_topic, group): 'bootstrap.servers': hosts, 'group.id': group, 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker + # messages don't have offset marked as stored until processed, + # but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, 'enable.auto.commit': True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 120000, + 'max.poll.interval.ms': 300000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 1000228..81aef5b 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -43,7 +43,7 @@ def run_ingest_file(args): grobid_client = GrobidClient(host_url=args.grobid_host) worker = IngestFileWorker(grobid_client=grobid_client, sink=sink) pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-file") + consume_topic=consume_topic, group="ingest-file", batch_size=1) pusher.run() def main(): |