From 060f86888c8638e3b2be1bb005c29718842ab2e1 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 27 Apr 2020 17:31:57 -0700 Subject: worker timeout wrapper, and use for kafka --- python/sandcrawler/workers.py | 42 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d5db7a5..9f1b55c 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -2,6 +2,7 @@ import sys import json import time +import signal import zipfile import multiprocessing.pool from collections import Counter @@ -40,6 +41,43 @@ class SandcrawlerWorker(object): print(json.dumps(result)) return result + def timeout_response(self, task): + """ + This should be overridden by workers that want to return something + meaningful when there is a processing timeout. Eg, JSON vs some other + error message. + """ + return None + + def push_record_timeout(self, task, timeout=300): + """ + A wrapper around self.push_record which sets a timeout. + + Note that this uses signals and *will behave wrong/weirdly* with + multithreading or if signal-based timeouts are used elsewhere in the + same process. + """ + + def timeout_handler(signum, frame): + raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) + resp = None + signal.alarm(int(timeout)) + try: + resp = self.push_record(task) + except TimeoutError: + self.counts['timeout'] += 1 + resp = self.timeout_response(task) # pylint: disable=assignment-from-none + # TODO: what if it is this push_record() itself that is timing out? + if self.sink: + self.sink.push_record(resp) + self.counts['pushed'] += 1 + else: + print(json.dumps(resp)) + finally: + signal.alarm(0) + return resp + def push_batch(self, tasks): results = [] for task in tasks: @@ -338,7 +376,6 @@ class ZipfilePusher(RecordPusher): print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts - class KafkaJsonPusher(RecordPusher): def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): @@ -398,7 +435,8 @@ class KafkaJsonPusher(RecordPusher): done = False while not done: try: - self.worker.push_record(record) + # use timeouts; don't want kafka itself to timeout + self.worker.push_record_timeout(record, timeout=300) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) -- cgit v1.2.3 From f2177d5e30190dfc1e55f1b08fd21c2ce917ee86 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 27 Apr 2020 17:41:45 -0700 Subject: timeout message implementation for GROBID and ingest workers --- python/sandcrawler/grobid.py | 9 +++++++++ python/sandcrawler/ingest.py | 9 +++++++++ 2 files changed, 18 insertions(+) (limited to 'python') diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 08e3a96..f329a73 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -87,6 +87,15 @@ class GrobidWorker(SandcrawlerWorker): self.sink = sink self.consolidate_mode = 2 + def timeout_response(self, task): + default_key = task['sha1hex'] + return dict( + status="error-timeout", + error_msg="internal GROBID worker timeout", + source=task, + key=default_key, + ) + def process(self, record): default_key = record['sha1hex'] if record.get('warc_path') and record.get('warc_offset'): diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 5cb3ef8..0be7653 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -229,6 +229,15 @@ class IngestFileWorker(SandcrawlerWorker): result.pop('key', None) return result + def timeout_response(self, task): + print("[TIMEOUT]", file=sys.stderr) + return dict( + request=task, + hit=False, + status="timeout", + error_message="ingest worker internal timeout", + ) + def process(self, request): # backwards compatibility -- cgit v1.2.3 From b306927e721349302d9a30511e8eb0c0676d4e04 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 29 Apr 2020 14:44:49 -0700 Subject: timeouts: don't push through None error messages --- python/sandcrawler/workers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 9f1b55c..96aef3f 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -69,10 +69,10 @@ class SandcrawlerWorker(object): self.counts['timeout'] += 1 resp = self.timeout_response(task) # pylint: disable=assignment-from-none # TODO: what if it is this push_record() itself that is timing out? - if self.sink: + if resp and self.sink: self.sink.push_record(resp) self.counts['pushed'] += 1 - else: + elif resp: print(json.dumps(resp)) finally: signal.alarm(0) -- cgit v1.2.3