diff options
-rw-r--r-- | python/sandcrawler/workers.py | 6 | ||||
-rwxr-xr-x | python/sandcrawler_worker.py | 2 |
2 files changed, 4 insertions, 4 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index c1203e1..def890e 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -401,13 +401,13 @@ def make_kafka_consumer(hosts, consume_topic, group): 'bootstrap.servers': hosts, 'group.id': group, 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker + # messages don't have offset marked as stored until processed, + # but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, 'enable.auto.commit': True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 120000, + 'max.poll.interval.ms': 300000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 1000228..81aef5b 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -43,7 +43,7 @@ def run_ingest_file(args): grobid_client = GrobidClient(host_url=args.grobid_host) worker = IngestFileWorker(grobid_client=grobid_client, sink=sink) pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-file") + consume_topic=consume_topic, group="ingest-file", batch_size=1) pusher.run() def main(): |