diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/Makefile | 15 | ||||
-rw-r--r-- | python/sandcrawler/grobid.py | 9 | ||||
-rw-r--r-- | python/sandcrawler/html.py | 8 | ||||
-rw-r--r-- | python/sandcrawler/ingest.py | 14 | ||||
-rw-r--r-- | python/sandcrawler/workers.py | 51 | ||||
-rwxr-xr-x | python/scripts/oai2ingestrequest.py | 137 |
6 files changed, 231 insertions, 3 deletions
diff --git a/python/Makefile b/python/Makefile new file mode 100644 index 0000000..1525900 --- /dev/null +++ b/python/Makefile @@ -0,0 +1,15 @@ + +SHELL = /bin/bash +.SHELLFLAGS = -o pipefail -c + +.PHONY: help +help: ## Print info about all commands + @echo "Commands:" + @echo + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf " \033[01;32m%-20s\033[0m %s\n", $$1, $$2}' + +.PHONY: test +test: ## Run all tests and lints + pipenv run pytest + #pipenv run mypy *.py sandcrawler/*.py tests/ --ignore-missing-imports + diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 08e3a96..f329a73 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -87,6 +87,15 @@ class GrobidWorker(SandcrawlerWorker): self.sink = sink self.consolidate_mode = 2 + def timeout_response(self, task): + default_key = task['sha1hex'] + return dict( + status="error-timeout", + error_msg="internal GROBID worker timeout", + source=task, + key=default_key, + ) + def process(self, record): default_key = record['sha1hex'] if record.get('warc_path') and record.get('warc_offset'): diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py index 8fbb0ba..88ea41b 100644 --- a/python/sandcrawler/html.py +++ b/python/sandcrawler/html.py @@ -42,7 +42,10 @@ def extract_fulltext_url(html_url, html_body): try: soup = BeautifulSoup(html_body, 'html.parser') except TypeError as te: - print("{} (url={})".format(te, html_url, file=sys.stderr)) + print(f"{te} (url={html_url})", file=sys.stderr) + return dict() + except UnboundLocalError as ule: + print(f"{ule} (url={html_url})", file=sys.stderr) return dict() ### General Tricks ### @@ -54,6 +57,9 @@ def extract_fulltext_url(html_url, html_body): if not meta: # researchgate does this; maybe others also? meta = soup.find('meta', attrs={"property":"citation_pdf_url"}) + # if tag is only partially populated + if meta and not meta.get('content'): + meta = None # wiley has a weird almost-blank page we don't want to loop on if meta and not "://onlinelibrary.wiley.com/doi/pdf/" in html_url: url = meta['content'].strip() diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 5cb3ef8..82b43fe 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -229,6 +229,20 @@ class IngestFileWorker(SandcrawlerWorker): result.pop('key', None) return result + def timeout_response(self, task): + print("[TIMEOUT]", file=sys.stderr) + return dict( + request=task, + hit=False, + status="timeout", + error_message="ingest worker internal timeout", + ) + + def want(self, request): + if not request.get('ingest_type') in ('file', 'pdf'): + return False + return True + def process(self, request): # backwards compatibility diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d5db7a5..6425e99 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -2,6 +2,7 @@ import sys import json import time +import signal import zipfile import multiprocessing.pool from collections import Counter @@ -26,6 +27,9 @@ class SandcrawlerWorker(object): def push_record(self, task): self.counts['total'] += 1 + if not self.want(task): + self.counts['skip'] += 1 + return result = self.process(task) if not result: self.counts['failed'] += 1 @@ -40,6 +44,43 @@ class SandcrawlerWorker(object): print(json.dumps(result)) return result + def timeout_response(self, task): + """ + This should be overridden by workers that want to return something + meaningful when there is a processing timeout. Eg, JSON vs some other + error message. + """ + return None + + def push_record_timeout(self, task, timeout=300): + """ + A wrapper around self.push_record which sets a timeout. + + Note that this uses signals and *will behave wrong/weirdly* with + multithreading or if signal-based timeouts are used elsewhere in the + same process. + """ + + def timeout_handler(signum, frame): + raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) + resp = None + signal.alarm(int(timeout)) + try: + resp = self.push_record(task) + except TimeoutError: + self.counts['timeout'] += 1 + resp = self.timeout_response(task) # pylint: disable=assignment-from-none + # TODO: what if it is this push_record() itself that is timing out? + if resp and self.sink: + self.sink.push_record(resp) + self.counts['pushed'] += 1 + elif resp: + print(json.dumps(resp)) + finally: + signal.alarm(0) + return resp + def push_batch(self, tasks): results = [] for task in tasks: @@ -52,6 +93,12 @@ class SandcrawlerWorker(object): print("Worker: {}".format(self.counts), file=sys.stderr) return self.counts + def want(self, task): + """ + Optionally override this as a filter in implementations. + """ + return True + def process(self, task): """ Derived workers need to implement business logic here. @@ -338,7 +385,6 @@ class ZipfilePusher(RecordPusher): print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts - class KafkaJsonPusher(RecordPusher): def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): @@ -398,7 +444,8 @@ class KafkaJsonPusher(RecordPusher): done = False while not done: try: - self.worker.push_record(record) + # use timeouts; don't want kafka itself to timeout + self.worker.push_record_timeout(record, timeout=300) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) diff --git a/python/scripts/oai2ingestrequest.py b/python/scripts/oai2ingestrequest.py new file mode 100755 index 0000000..916f41c --- /dev/null +++ b/python/scripts/oai2ingestrequest.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 + +""" +Transform an OAI-PMH bulk dump (JSON) into ingest requests. + +Eg: https://archive.org/details/oai_harvest_20200215 +""" + +import sys +import json +import argparse +import urlcanon + +DOMAIN_BLOCKLIST = [ + # large OA publishers (we get via DOI) + + # large repos and aggregators (we crawl directly) + "://arxiv.org/", + "://europepmc.org/", + "ncbi.nlm.nih.gov/", + "semanticscholar.org/", + "://doi.org/", + "://dx.doi.org/", + "zenodo.org/", + "figshare.com/", + "://archive.org/", + ".archive.org/", + "://127.0.0.1/", + + # OAI specific additions + "://hdl.handle.net/", +] + +RELEASE_STAGE_MAP = { + 'info:eu-repo/semantics/draftVersion': 'draft', + 'info:eu-repo/semantics/submittedVersion': 'submitted', + 'info:eu-repo/semantics/acceptedVersion': 'accepted', + 'info:eu-repo/semantics/publishedVersion': 'published', + 'info:eu-repo/semantics/updatedVersion': 'updated', +} + +def canon(s): + parsed = urlcanon.parse_url(s) + return str(urlcanon.whatwg(parsed)) + +def transform(obj): + """ + Transforms from a single OAI-PMH object to zero or more ingest requests. + Returns a list of dicts. + """ + + requests = [] + if not obj.get('oai') or not obj['oai'].startswith('oai:'): + return [] + if not obj.get('urls'): + return [] + + # look in obj['formats'] for PDF? + if obj.get('formats'): + # if there is a list of formats, and it does not contain PDF, then + # skip. Note that we will continue if there is no formats list. + has_pdf = False + for f in obj['formats']: + if 'pdf' in f.lower(): + has_pdf = True + if not has_pdf: + return [] + + doi = None + if obj.get('doi'): + doi = obj['doi'][0].lower().strip() + if not doi.startswith('10.'): + doi = None + + # infer release stage and/or type from obj['types'] + release_stage = None + for t in obj.get('types', []): + if t in RELEASE_STAGE_MAP: + release_stage = RELEASE_STAGE_MAP[t] + + # TODO: infer rel somehow? Eg, repository vs. OJS publisher + rel = None + + for url in obj['urls']: + skip = False + for domain in DOMAIN_BLOCKLIST: + if domain in url: + skip = True + if skip: + continue + try: + base_url = canon(url) + except UnicodeEncodeError: + continue + + request = { + 'base_url': base_url, + 'ingest_type': 'pdf', + 'link_source': 'oai', + 'link_source_id': obj['oai'].lower(), + 'ingest_request_source': 'metha-bulk', + 'release_stage': release_stage, + 'rel': rel, + 'ext_ids': { + 'doi': doi, + 'oai': obj['oai'].lower(), + }, + 'edit_extra': {}, + } + requests.append(request) + + return requests + +def run(args): + for l in args.json_file: + if not l.strip(): + continue + row = json.loads(l) + + requests = transform(row) or [] + for r in requests: + print("{}".format(json.dumps(r, sort_keys=True))) + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('json_file', + help="OAI-PMH dump file to use (usually stdin)", + type=argparse.FileType('r')) + subparsers = parser.add_subparsers() + + args = parser.parse_args() + + run(args) + +if __name__ == '__main__': + main() |