aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/grobid.py9
-rw-r--r--python/sandcrawler/html.py8
-rw-r--r--python/sandcrawler/ingest.py14
-rw-r--r--python/sandcrawler/workers.py51
4 files changed, 79 insertions, 3 deletions
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 08e3a96..f329a73 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -87,6 +87,15 @@ class GrobidWorker(SandcrawlerWorker):
self.sink = sink
self.consolidate_mode = 2
+ def timeout_response(self, task):
+ default_key = task['sha1hex']
+ return dict(
+ status="error-timeout",
+ error_msg="internal GROBID worker timeout",
+ source=task,
+ key=default_key,
+ )
+
def process(self, record):
default_key = record['sha1hex']
if record.get('warc_path') and record.get('warc_offset'):
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
index 8fbb0ba..88ea41b 100644
--- a/python/sandcrawler/html.py
+++ b/python/sandcrawler/html.py
@@ -42,7 +42,10 @@ def extract_fulltext_url(html_url, html_body):
try:
soup = BeautifulSoup(html_body, 'html.parser')
except TypeError as te:
- print("{} (url={})".format(te, html_url, file=sys.stderr))
+ print(f"{te} (url={html_url})", file=sys.stderr)
+ return dict()
+ except UnboundLocalError as ule:
+ print(f"{ule} (url={html_url})", file=sys.stderr)
return dict()
### General Tricks ###
@@ -54,6 +57,9 @@ def extract_fulltext_url(html_url, html_body):
if not meta:
# researchgate does this; maybe others also?
meta = soup.find('meta', attrs={"property":"citation_pdf_url"})
+ # if tag is only partially populated
+ if meta and not meta.get('content'):
+ meta = None
# wiley has a weird almost-blank page we don't want to loop on
if meta and not "://onlinelibrary.wiley.com/doi/pdf/" in html_url:
url = meta['content'].strip()
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 5cb3ef8..82b43fe 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -229,6 +229,20 @@ class IngestFileWorker(SandcrawlerWorker):
result.pop('key', None)
return result
+ def timeout_response(self, task):
+ print("[TIMEOUT]", file=sys.stderr)
+ return dict(
+ request=task,
+ hit=False,
+ status="timeout",
+ error_message="ingest worker internal timeout",
+ )
+
+ def want(self, request):
+ if not request.get('ingest_type') in ('file', 'pdf'):
+ return False
+ return True
+
def process(self, request):
# backwards compatibility
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d5db7a5..6425e99 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import signal
import zipfile
import multiprocessing.pool
from collections import Counter
@@ -26,6 +27,9 @@ class SandcrawlerWorker(object):
def push_record(self, task):
self.counts['total'] += 1
+ if not self.want(task):
+ self.counts['skip'] += 1
+ return
result = self.process(task)
if not result:
self.counts['failed'] += 1
@@ -40,6 +44,43 @@ class SandcrawlerWorker(object):
print(json.dumps(result))
return result
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
def push_batch(self, tasks):
results = []
for task in tasks:
@@ -52,6 +93,12 @@ class SandcrawlerWorker(object):
print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
+ def want(self, task):
+ """
+ Optionally override this as a filter in implementations.
+ """
+ return True
+
def process(self, task):
"""
Derived workers need to implement business logic here.
@@ -338,7 +385,6 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-
class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
@@ -398,7 +444,8 @@ class KafkaJsonPusher(RecordPusher):
done = False
while not done:
try:
- self.worker.push_record(record)
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))