From 031f51752e79dbdde47bbc95fe6b3600c9ec711a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 22 Oct 2020 23:08:15 -0700 Subject: reimplement worker timeout with multiprocessing --- python/sandcrawler/workers.py | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 37e3d7a..b6cec29 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -2,7 +2,6 @@ import sys import json import time -import signal import zipfile import requests import multiprocessing.pool @@ -57,30 +56,25 @@ class SandcrawlerWorker(object): """ A wrapper around self.push_record which sets a timeout. - Note that this uses signals and *will behave wrong/weirdly* with - multithreading or if signal-based timeouts are used elsewhere in the - same process. + Current implementation uses multiprocessing instead of signal. """ - def timeout_handler(signum, frame): - raise TimeoutError("timeout processing record") - signal.signal(signal.SIGALRM, timeout_handler) - resp = None - signal.alarm(int(timeout)) - try: - resp = self.push_record(task, key=key) - except TimeoutError: - self.counts['timeout'] += 1 - resp = self.timeout_response(task) # pylint: disable=assignment-from-none - # TODO: what if it is this push_record() itself that is timing out? - if resp and self.sink: - self.sink.push_record(resp) - self.counts['pushed'] += 1 - elif resp: - print(json.dumps(resp)) - finally: - signal.alarm(0) - return resp + with multiprocessing.pool.Pool(1) as p: + proc = p.apply_async(self._push_record_helper, (task, key)) + try: + resp = proc.get(timeout=timeout) + except TimeoutError: + self.counts['timeout'] += 1 + resp = self.timeout_response(task) # pylint: disable=assignment-from-none + # TODO: what if it is this push_record() itself that is timing out? + if resp and self.sink: + self.sink.push_record(resp) + self.counts['pushed'] += 1 + elif resp: + print(json.dumps(resp)) + raise TimeoutError("timeout processing record") + else: + return resp def push_batch(self, tasks): results = [] -- cgit v1.2.3