aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-10-22 23:40:27 -0700
committerBryan Newbold <bnewbold@archive.org>2020-10-22 23:40:27 -0700
commit2b6f6c98598442fff04e76c658e8eb331fba4c9f (patch)
treea3c04f842e24d02129c947c3175fe67c9bcff9b5 /python
parent532808ddd438d4fe266956d5e65427ea3a2815a6 (diff)
downloadsandcrawler-2b6f6c98598442fff04e76c658e8eb331fba4c9f.tar.gz
sandcrawler-2b6f6c98598442fff04e76c658e8eb331fba4c9f.zip
Revert "reimplement worker timeout with multiprocessing"
This reverts commit 031f51752e79dbdde47bbc95fe6b3600c9ec711a. Didn't actually work when testing; can't pickle the Kafka Producer object (and probably other objects)
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/workers.py40
1 files changed, 23 insertions, 17 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index b6cec29..37e3d7a 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import signal
import zipfile
import requests
import multiprocessing.pool
@@ -56,25 +57,30 @@ class SandcrawlerWorker(object):
"""
A wrapper around self.push_record which sets a timeout.
- Current implementation uses multiprocessing instead of signal.
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
"""
- with multiprocessing.pool.Pool(1) as p:
- proc = p.apply_async(self._push_record_helper, (task, key))
- try:
- resp = proc.get(timeout=timeout)
- except TimeoutError:
- self.counts['timeout'] += 1
- resp = self.timeout_response(task) # pylint: disable=assignment-from-none
- # TODO: what if it is this push_record() itself that is timing out?
- if resp and self.sink:
- self.sink.push_record(resp)
- self.counts['pushed'] += 1
- elif resp:
- print(json.dumps(resp))
- raise TimeoutError("timeout processing record")
- else:
- return resp
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task, key=key)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
def push_batch(self, tasks):
results = []