aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/Makefile15
-rw-r--r--python/sandcrawler/grobid.py9
-rw-r--r--python/sandcrawler/html.py8
-rw-r--r--python/sandcrawler/ingest.py14
-rw-r--r--python/sandcrawler/workers.py51
-rwxr-xr-xpython/scripts/oai2ingestrequest.py137
6 files changed, 231 insertions, 3 deletions
diff --git a/python/Makefile b/python/Makefile
new file mode 100644
index 0000000..1525900
--- /dev/null
+++ b/python/Makefile
@@ -0,0 +1,15 @@
+
+SHELL = /bin/bash
+.SHELLFLAGS = -o pipefail -c
+
+.PHONY: help
+help: ## Print info about all commands
+ @echo "Commands:"
+ @echo
+ @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf " \033[01;32m%-20s\033[0m %s\n", $$1, $$2}'
+
+.PHONY: test
+test: ## Run all tests and lints
+ pipenv run pytest
+ #pipenv run mypy *.py sandcrawler/*.py tests/ --ignore-missing-imports
+
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 08e3a96..f329a73 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -87,6 +87,15 @@ class GrobidWorker(SandcrawlerWorker):
self.sink = sink
self.consolidate_mode = 2
+ def timeout_response(self, task):
+ default_key = task['sha1hex']
+ return dict(
+ status="error-timeout",
+ error_msg="internal GROBID worker timeout",
+ source=task,
+ key=default_key,
+ )
+
def process(self, record):
default_key = record['sha1hex']
if record.get('warc_path') and record.get('warc_offset'):
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
index 8fbb0ba..88ea41b 100644
--- a/python/sandcrawler/html.py
+++ b/python/sandcrawler/html.py
@@ -42,7 +42,10 @@ def extract_fulltext_url(html_url, html_body):
try:
soup = BeautifulSoup(html_body, 'html.parser')
except TypeError as te:
- print("{} (url={})".format(te, html_url, file=sys.stderr))
+ print(f"{te} (url={html_url})", file=sys.stderr)
+ return dict()
+ except UnboundLocalError as ule:
+ print(f"{ule} (url={html_url})", file=sys.stderr)
return dict()
### General Tricks ###
@@ -54,6 +57,9 @@ def extract_fulltext_url(html_url, html_body):
if not meta:
# researchgate does this; maybe others also?
meta = soup.find('meta', attrs={"property":"citation_pdf_url"})
+ # if tag is only partially populated
+ if meta and not meta.get('content'):
+ meta = None
# wiley has a weird almost-blank page we don't want to loop on
if meta and not "://onlinelibrary.wiley.com/doi/pdf/" in html_url:
url = meta['content'].strip()
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 5cb3ef8..82b43fe 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -229,6 +229,20 @@ class IngestFileWorker(SandcrawlerWorker):
result.pop('key', None)
return result
+ def timeout_response(self, task):
+ print("[TIMEOUT]", file=sys.stderr)
+ return dict(
+ request=task,
+ hit=False,
+ status="timeout",
+ error_message="ingest worker internal timeout",
+ )
+
+ def want(self, request):
+ if not request.get('ingest_type') in ('file', 'pdf'):
+ return False
+ return True
+
def process(self, request):
# backwards compatibility
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d5db7a5..6425e99 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import signal
import zipfile
import multiprocessing.pool
from collections import Counter
@@ -26,6 +27,9 @@ class SandcrawlerWorker(object):
def push_record(self, task):
self.counts['total'] += 1
+ if not self.want(task):
+ self.counts['skip'] += 1
+ return
result = self.process(task)
if not result:
self.counts['failed'] += 1
@@ -40,6 +44,43 @@ class SandcrawlerWorker(object):
print(json.dumps(result))
return result
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
def push_batch(self, tasks):
results = []
for task in tasks:
@@ -52,6 +93,12 @@ class SandcrawlerWorker(object):
print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
+ def want(self, task):
+ """
+ Optionally override this as a filter in implementations.
+ """
+ return True
+
def process(self, task):
"""
Derived workers need to implement business logic here.
@@ -338,7 +385,6 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-
class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
@@ -398,7 +444,8 @@ class KafkaJsonPusher(RecordPusher):
done = False
while not done:
try:
- self.worker.push_record(record)
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
diff --git a/python/scripts/oai2ingestrequest.py b/python/scripts/oai2ingestrequest.py
new file mode 100755
index 0000000..916f41c
--- /dev/null
+++ b/python/scripts/oai2ingestrequest.py
@@ -0,0 +1,137 @@
+#!/usr/bin/env python3
+
+"""
+Transform an OAI-PMH bulk dump (JSON) into ingest requests.
+
+Eg: https://archive.org/details/oai_harvest_20200215
+"""
+
+import sys
+import json
+import argparse
+import urlcanon
+
+DOMAIN_BLOCKLIST = [
+ # large OA publishers (we get via DOI)
+
+ # large repos and aggregators (we crawl directly)
+ "://arxiv.org/",
+ "://europepmc.org/",
+ "ncbi.nlm.nih.gov/",
+ "semanticscholar.org/",
+ "://doi.org/",
+ "://dx.doi.org/",
+ "zenodo.org/",
+ "figshare.com/",
+ "://archive.org/",
+ ".archive.org/",
+ "://127.0.0.1/",
+
+ # OAI specific additions
+ "://hdl.handle.net/",
+]
+
+RELEASE_STAGE_MAP = {
+ 'info:eu-repo/semantics/draftVersion': 'draft',
+ 'info:eu-repo/semantics/submittedVersion': 'submitted',
+ 'info:eu-repo/semantics/acceptedVersion': 'accepted',
+ 'info:eu-repo/semantics/publishedVersion': 'published',
+ 'info:eu-repo/semantics/updatedVersion': 'updated',
+}
+
+def canon(s):
+ parsed = urlcanon.parse_url(s)
+ return str(urlcanon.whatwg(parsed))
+
+def transform(obj):
+ """
+ Transforms from a single OAI-PMH object to zero or more ingest requests.
+ Returns a list of dicts.
+ """
+
+ requests = []
+ if not obj.get('oai') or not obj['oai'].startswith('oai:'):
+ return []
+ if not obj.get('urls'):
+ return []
+
+ # look in obj['formats'] for PDF?
+ if obj.get('formats'):
+ # if there is a list of formats, and it does not contain PDF, then
+ # skip. Note that we will continue if there is no formats list.
+ has_pdf = False
+ for f in obj['formats']:
+ if 'pdf' in f.lower():
+ has_pdf = True
+ if not has_pdf:
+ return []
+
+ doi = None
+ if obj.get('doi'):
+ doi = obj['doi'][0].lower().strip()
+ if not doi.startswith('10.'):
+ doi = None
+
+ # infer release stage and/or type from obj['types']
+ release_stage = None
+ for t in obj.get('types', []):
+ if t in RELEASE_STAGE_MAP:
+ release_stage = RELEASE_STAGE_MAP[t]
+
+ # TODO: infer rel somehow? Eg, repository vs. OJS publisher
+ rel = None
+
+ for url in obj['urls']:
+ skip = False
+ for domain in DOMAIN_BLOCKLIST:
+ if domain in url:
+ skip = True
+ if skip:
+ continue
+ try:
+ base_url = canon(url)
+ except UnicodeEncodeError:
+ continue
+
+ request = {
+ 'base_url': base_url,
+ 'ingest_type': 'pdf',
+ 'link_source': 'oai',
+ 'link_source_id': obj['oai'].lower(),
+ 'ingest_request_source': 'metha-bulk',
+ 'release_stage': release_stage,
+ 'rel': rel,
+ 'ext_ids': {
+ 'doi': doi,
+ 'oai': obj['oai'].lower(),
+ },
+ 'edit_extra': {},
+ }
+ requests.append(request)
+
+ return requests
+
+def run(args):
+ for l in args.json_file:
+ if not l.strip():
+ continue
+ row = json.loads(l)
+
+ requests = transform(row) or []
+ for r in requests:
+ print("{}".format(json.dumps(r, sort_keys=True)))
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('json_file',
+ help="OAI-PMH dump file to use (usually stdin)",
+ type=argparse.FileType('r'))
+ subparsers = parser.add_subparsers()
+
+ args = parser.parse_args()
+
+ run(args)
+
+if __name__ == '__main__':
+ main()