From 800860ecd25346ff4a638e9d42fa905396b8fa1b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 29 Jun 2020 13:19:17 -0700 Subject: customize timeout per worker; 120sec for pdf-extract This is a stab-in-the-dark attempt to resolve long timeouts with this worker in prod. --- python/sandcrawler/pdfextract.py | 2 +- python/sandcrawler/workers.py | 3 ++- python/sandcrawler_worker.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py index 5f9b898..ac5f6ac 100644 --- a/python/sandcrawler/pdfextract.py +++ b/python/sandcrawler/pdfextract.py @@ -278,7 +278,7 @@ class PdfExtractWorker(SandcrawlerFetchWorker): default_key = task['sha1hex'] return dict( status="error-timeout", - error_msg="internal GROBID worker timeout", + error_msg="internal pdf-extract worker timeout", source=task, sha1hex=default_key, ) diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index f6693bb..814cbf3 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -488,6 +488,7 @@ class KafkaJsonPusher(RecordPusher): if self.batch_size in (0, 1): self.batch_size = 1 self.batch_worker = kwargs.get('batch_worker', False) + self.process_timeout_sec = kwargs.get('process_timeout_sec', 300) def run(self): while True: @@ -539,7 +540,7 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, key=msg.key(), timeout=300) + self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 0fd0194..833b9c4 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -75,6 +75,7 @@ def run_pdf_extract(args): consume_topic=consume_topic, group="pdf-extract", batch_size=1, + push_timeout_sec=120, ) pusher.run() -- cgit v1.2.3