aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/grobid.py9
-rw-r--r--python/sandcrawler/ingest.py9
-rw-r--r--python/sandcrawler/workers.py42
3 files changed, 58 insertions, 2 deletions
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 08e3a96..f329a73 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -87,6 +87,15 @@ class GrobidWorker(SandcrawlerWorker):
self.sink = sink
self.consolidate_mode = 2
+ def timeout_response(self, task):
+ default_key = task['sha1hex']
+ return dict(
+ status="error-timeout",
+ error_msg="internal GROBID worker timeout",
+ source=task,
+ key=default_key,
+ )
+
def process(self, record):
default_key = record['sha1hex']
if record.get('warc_path') and record.get('warc_offset'):
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 5cb3ef8..0be7653 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -229,6 +229,15 @@ class IngestFileWorker(SandcrawlerWorker):
result.pop('key', None)
return result
+ def timeout_response(self, task):
+ print("[TIMEOUT]", file=sys.stderr)
+ return dict(
+ request=task,
+ hit=False,
+ status="timeout",
+ error_message="ingest worker internal timeout",
+ )
+
def process(self, request):
# backwards compatibility
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d5db7a5..96aef3f 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import signal
import zipfile
import multiprocessing.pool
from collections import Counter
@@ -40,6 +41,43 @@ class SandcrawlerWorker(object):
print(json.dumps(result))
return result
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
def push_batch(self, tasks):
results = []
for task in tasks:
@@ -338,7 +376,6 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-
class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
@@ -398,7 +435,8 @@ class KafkaJsonPusher(RecordPusher):
done = False
while not done:
try:
- self.worker.push_record(record)
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))