aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/workers.py40
1 files changed, 17 insertions, 23 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 37e3d7a..b6cec29 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,7 +2,6 @@
import sys
import json
import time
-import signal
import zipfile
import requests
import multiprocessing.pool
@@ -57,30 +56,25 @@ class SandcrawlerWorker(object):
"""
A wrapper around self.push_record which sets a timeout.
- Note that this uses signals and *will behave wrong/weirdly* with
- multithreading or if signal-based timeouts are used elsewhere in the
- same process.
+ Current implementation uses multiprocessing instead of signal.
"""
- def timeout_handler(signum, frame):
- raise TimeoutError("timeout processing record")
- signal.signal(signal.SIGALRM, timeout_handler)
- resp = None
- signal.alarm(int(timeout))
- try:
- resp = self.push_record(task, key=key)
- except TimeoutError:
- self.counts['timeout'] += 1
- resp = self.timeout_response(task) # pylint: disable=assignment-from-none
- # TODO: what if it is this push_record() itself that is timing out?
- if resp and self.sink:
- self.sink.push_record(resp)
- self.counts['pushed'] += 1
- elif resp:
- print(json.dumps(resp))
- finally:
- signal.alarm(0)
- return resp
+ with multiprocessing.pool.Pool(1) as p:
+ proc = p.apply_async(self._push_record_helper, (task, key))
+ try:
+ resp = proc.get(timeout=timeout)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ raise TimeoutError("timeout processing record")
+ else:
+ return resp
def push_batch(self, tasks):
results = []