diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 51 |
1 files changed, 49 insertions, 2 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d5db7a5..6425e99 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -2,6 +2,7 @@ import sys import json import time +import signal import zipfile import multiprocessing.pool from collections import Counter @@ -26,6 +27,9 @@ class SandcrawlerWorker(object): def push_record(self, task): self.counts['total'] += 1 + if not self.want(task): + self.counts['skip'] += 1 + return result = self.process(task) if not result: self.counts['failed'] += 1 @@ -40,6 +44,43 @@ class SandcrawlerWorker(object): print(json.dumps(result)) return result + def timeout_response(self, task): + """ + This should be overridden by workers that want to return something + meaningful when there is a processing timeout. Eg, JSON vs some other + error message. + """ + return None + + def push_record_timeout(self, task, timeout=300): + """ + A wrapper around self.push_record which sets a timeout. + + Note that this uses signals and *will behave wrong/weirdly* with + multithreading or if signal-based timeouts are used elsewhere in the + same process. + """ + + def timeout_handler(signum, frame): + raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) + resp = None + signal.alarm(int(timeout)) + try: + resp = self.push_record(task) + except TimeoutError: + self.counts['timeout'] += 1 + resp = self.timeout_response(task) # pylint: disable=assignment-from-none + # TODO: what if it is this push_record() itself that is timing out? + if resp and self.sink: + self.sink.push_record(resp) + self.counts['pushed'] += 1 + elif resp: + print(json.dumps(resp)) + finally: + signal.alarm(0) + return resp + def push_batch(self, tasks): results = [] for task in tasks: @@ -52,6 +93,12 @@ class SandcrawlerWorker(object): print("Worker: {}".format(self.counts), file=sys.stderr) return self.counts + def want(self, task): + """ + Optionally override this as a filter in implementations. + """ + return True + def process(self, task): """ Derived workers need to implement business logic here. @@ -338,7 +385,6 @@ class ZipfilePusher(RecordPusher): print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts - class KafkaJsonPusher(RecordPusher): def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): @@ -398,7 +444,8 @@ class KafkaJsonPusher(RecordPusher): done = False while not done: try: - self.worker.push_record(record) + # use timeouts; don't want kafka itself to timeout + self.worker.push_record_timeout(record, timeout=300) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) |