aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-11-14 00:42:27 -0800
committerBryan Newbold <bnewbold@archive.org>2019-11-14 00:42:29 -0800
commit8774b17dbb9c0be8ca44846188f77403fae3e867 (patch)
tree81af1a410b1f7e5c9eed54468adec7f8b9701d42
parent18473bd57f9255ba2cd7fe9a75881abf601df7b1 (diff)
downloadsandcrawler-8774b17dbb9c0be8ca44846188f77403fae3e867.tar.gz
sandcrawler-8774b17dbb9c0be8ca44846188f77403fae3e867.zip
update ingest-file batch size to 1
Was defaulting to 100, which I think was resulting in lots of consumer group timeouts, resulting in UNKNOWN_MEMBER_ID errors. Will probably switch back to batches of 10 or so, but multi-processing or some other concurrent dispatch/processing.
-rw-r--r--python/sandcrawler/workers.py6
-rwxr-xr-xpython/sandcrawler_worker.py2
2 files changed, 4 insertions, 4 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index c1203e1..def890e 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -401,13 +401,13 @@ def make_kafka_consumer(hosts, consume_topic, group):
'bootstrap.servers': hosts,
'group.id': group,
'on_commit': fail_fast,
- # messages don't have offset marked as stored until pushed to
- # elastic, but we do auto-commit stored offsets to broker
+ # messages don't have offset marked as stored until processed,
+ # but we do auto-commit stored offsets to broker
'enable.auto.offset.store': False,
'enable.auto.commit': True,
# user code timeout; if no poll after this long, assume user code
# hung and rebalance (default: 5min)
- 'max.poll.interval.ms': 120000,
+ 'max.poll.interval.ms': 300000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 1000228..81aef5b 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -43,7 +43,7 @@ def run_ingest_file(args):
grobid_client = GrobidClient(host_url=args.grobid_host)
worker = IngestFileWorker(grobid_client=grobid_client, sink=sink)
pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- consume_topic=consume_topic, group="ingest-file")
+ consume_topic=consume_topic, group="ingest-file", batch_size=1)
pusher.run()
def main():