aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py51
1 files changed, 49 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d5db7a5..6425e99 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import signal
import zipfile
import multiprocessing.pool
from collections import Counter
@@ -26,6 +27,9 @@ class SandcrawlerWorker(object):
def push_record(self, task):
self.counts['total'] += 1
+ if not self.want(task):
+ self.counts['skip'] += 1
+ return
result = self.process(task)
if not result:
self.counts['failed'] += 1
@@ -40,6 +44,43 @@ class SandcrawlerWorker(object):
print(json.dumps(result))
return result
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
def push_batch(self, tasks):
results = []
for task in tasks:
@@ -52,6 +93,12 @@ class SandcrawlerWorker(object):
print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
+ def want(self, task):
+ """
+ Optionally override this as a filter in implementations.
+ """
+ return True
+
def process(self, task):
"""
Derived workers need to implement business logic here.
@@ -338,7 +385,6 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-
class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
@@ -398,7 +444,8 @@ class KafkaJsonPusher(RecordPusher):
done = False
while not done:
try:
- self.worker.push_record(record)
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))