aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-04-27 17:31:57 -0700
committerBryan Newbold <bnewbold@archive.org>2020-04-27 17:41:15 -0700
commit060f86888c8638e3b2be1bb005c29718842ab2e1 (patch)
tree645024b748ee7da6e7a17644dc4718d64b4c1c57
parenta8d76d29c23b9aaf32fe531e56244bb3422a23aa (diff)
downloadsandcrawler-060f86888c8638e3b2be1bb005c29718842ab2e1.tar.gz
sandcrawler-060f86888c8638e3b2be1bb005c29718842ab2e1.zip
worker timeout wrapper, and use for kafka
-rw-r--r--python/sandcrawler/workers.py42
1 files changed, 40 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d5db7a5..9f1b55c 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import signal
import zipfile
import multiprocessing.pool
from collections import Counter
@@ -40,6 +41,43 @@ class SandcrawlerWorker(object):
print(json.dumps(result))
return result
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
def push_batch(self, tasks):
results = []
for task in tasks:
@@ -338,7 +376,6 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-
class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
@@ -398,7 +435,8 @@ class KafkaJsonPusher(RecordPusher):
done = False
while not done:
try:
- self.worker.push_record(record)
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))