diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-10-22 23:40:27 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-10-22 23:40:27 -0700 |
commit | 2b6f6c98598442fff04e76c658e8eb331fba4c9f (patch) | |
tree | a3c04f842e24d02129c947c3175fe67c9bcff9b5 /python | |
parent | 532808ddd438d4fe266956d5e65427ea3a2815a6 (diff) | |
download | sandcrawler-2b6f6c98598442fff04e76c658e8eb331fba4c9f.tar.gz sandcrawler-2b6f6c98598442fff04e76c658e8eb331fba4c9f.zip |
Revert "reimplement worker timeout with multiprocessing"
This reverts commit 031f51752e79dbdde47bbc95fe6b3600c9ec711a.
Didn't actually work when testing; can't pickle the Kafka Producer
object (and probably other objects)
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/workers.py | 40 |
1 files changed, 23 insertions, 17 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index b6cec29..37e3d7a 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -2,6 +2,7 @@ import sys import json import time +import signal import zipfile import requests import multiprocessing.pool @@ -56,25 +57,30 @@ class SandcrawlerWorker(object): """ A wrapper around self.push_record which sets a timeout. - Current implementation uses multiprocessing instead of signal. + Note that this uses signals and *will behave wrong/weirdly* with + multithreading or if signal-based timeouts are used elsewhere in the + same process. """ - with multiprocessing.pool.Pool(1) as p: - proc = p.apply_async(self._push_record_helper, (task, key)) - try: - resp = proc.get(timeout=timeout) - except TimeoutError: - self.counts['timeout'] += 1 - resp = self.timeout_response(task) # pylint: disable=assignment-from-none - # TODO: what if it is this push_record() itself that is timing out? - if resp and self.sink: - self.sink.push_record(resp) - self.counts['pushed'] += 1 - elif resp: - print(json.dumps(resp)) - raise TimeoutError("timeout processing record") - else: - return resp + def timeout_handler(signum, frame): + raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) + resp = None + signal.alarm(int(timeout)) + try: + resp = self.push_record(task, key=key) + except TimeoutError: + self.counts['timeout'] += 1 + resp = self.timeout_response(task) # pylint: disable=assignment-from-none + # TODO: what if it is this push_record() itself that is timing out? + if resp and self.sink: + self.sink.push_record(resp) + self.counts['pushed'] += 1 + elif resp: + print(json.dumps(resp)) + finally: + signal.alarm(0) + return resp def push_batch(self, tasks): results = [] |