From 7430ddbcdec76091220de474060b968f0ef1bb70 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 30 Sep 2021 15:08:47 -0700 Subject: rename some python files for clarity --- python/ingest_file.py | 100 ----- python/ingest_tool.py | 100 +++++ python/sandcrawler/html_ingest.py | 441 -------------------- python/sandcrawler/ingest.py | 833 -------------------------------------- python/sandcrawler/ingest_file.py | 833 ++++++++++++++++++++++++++++++++++++++ python/sandcrawler/ingest_html.py | 441 ++++++++++++++++++++ 6 files changed, 1374 insertions(+), 1374 deletions(-) delete mode 100755 python/ingest_file.py create mode 100755 python/ingest_tool.py delete mode 100644 python/sandcrawler/html_ingest.py delete mode 100644 python/sandcrawler/ingest.py create mode 100644 python/sandcrawler/ingest_file.py create mode 100644 python/sandcrawler/ingest_html.py (limited to 'python') diff --git a/python/ingest_file.py b/python/ingest_file.py deleted file mode 100755 index 20b6d67..0000000 --- a/python/ingest_file.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import json -import argparse - -from http.server import HTTPServer -from sandcrawler.ingest import IngestFileRequestHandler, IngestFileWorker - - -def run_single_ingest(args): - request = dict( - ingest_type=args.ingest_type, - base_url=args.url, - ext_ids=dict(doi=args.doi), - fatcat=dict(release_ident=args.release_id), - ) - if args.force_recrawl: - request['force_recrawl'] = True - ingester = IngestFileWorker( - try_spn2=not args.no_spn2, - html_quick_mode=args.html_quick_mode, - ) - result = ingester.process(request) - print(json.dumps(result, sort_keys=True)) - return result - -def run_requests(args): - # TODO: switch to using JsonLinePusher - ingester = IngestFileWorker( - try_spn2=not args.no_spn2, - html_quick_mode=args.html_quick_mode, - ) - for l in args.json_file: - request = json.loads(l.strip()) - result = ingester.process(request) - print(json.dumps(result, sort_keys=True)) - -def run_api(args): - port = 8083 - print("Listening on localhost:{}".format(port)) - server = HTTPServer(('', port), IngestFileRequestHandler) - server.serve_forever() - -def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - subparsers = parser.add_subparsers() - - sub_single= subparsers.add_parser('single', - help="ingests a single file URL") - sub_single.set_defaults(func=run_single_ingest) - sub_single.add_argument('--release-id', - help="(optional) existing release ident to match to") - sub_single.add_argument('--doi', - help="(optional) existing release DOI to match to") - sub_single.add_argument('--force-recrawl', - action='store_true', - help="ignore GWB history and use SPNv2 to re-crawl") - sub_single.add_argument('--no-spn2', - action='store_true', - help="don't use live web (SPNv2)") - sub_single.add_argument('--ingest-type', - default="pdf", - help="type of ingest (pdf, html, etc)") - sub_single.add_argument('--html-quick-mode', - action='store_true', - help="don't fetch individual sub-resources, just use CDX") - sub_single.add_argument('url', - help="URL of paper to fetch") - - sub_requests = subparsers.add_parser('requests', - help="takes a series of ingest requests (JSON, per line) and runs each") - sub_requests.add_argument('--no-spn2', - action='store_true', - help="don't use live web (SPNv2)") - sub_requests.add_argument('--html-quick-mode', - action='store_true', - help="don't fetch individual sub-resources, just use CDX") - sub_requests.set_defaults(func=run_requests) - sub_requests.add_argument('json_file', - help="JSON file (request per line) to import from (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) - - sub_api = subparsers.add_parser('api', - help="starts a simple HTTP server that processes ingest requests") - sub_api.set_defaults(func=run_api) - sub_api.add_argument('--port', - help="HTTP port to listen on", - default=8033, type=int) - - args = parser.parse_args() - if not args.__dict__.get("func"): - parser.print_help(file=sys.stderr) - sys.exit(-1) - - args.func(args) - -if __name__ == '__main__': - main() diff --git a/python/ingest_tool.py b/python/ingest_tool.py new file mode 100755 index 0000000..20b6d67 --- /dev/null +++ b/python/ingest_tool.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 + +import sys +import json +import argparse + +from http.server import HTTPServer +from sandcrawler.ingest import IngestFileRequestHandler, IngestFileWorker + + +def run_single_ingest(args): + request = dict( + ingest_type=args.ingest_type, + base_url=args.url, + ext_ids=dict(doi=args.doi), + fatcat=dict(release_ident=args.release_id), + ) + if args.force_recrawl: + request['force_recrawl'] = True + ingester = IngestFileWorker( + try_spn2=not args.no_spn2, + html_quick_mode=args.html_quick_mode, + ) + result = ingester.process(request) + print(json.dumps(result, sort_keys=True)) + return result + +def run_requests(args): + # TODO: switch to using JsonLinePusher + ingester = IngestFileWorker( + try_spn2=not args.no_spn2, + html_quick_mode=args.html_quick_mode, + ) + for l in args.json_file: + request = json.loads(l.strip()) + result = ingester.process(request) + print(json.dumps(result, sort_keys=True)) + +def run_api(args): + port = 8083 + print("Listening on localhost:{}".format(port)) + server = HTTPServer(('', port), IngestFileRequestHandler) + server.serve_forever() + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + subparsers = parser.add_subparsers() + + sub_single= subparsers.add_parser('single', + help="ingests a single file URL") + sub_single.set_defaults(func=run_single_ingest) + sub_single.add_argument('--release-id', + help="(optional) existing release ident to match to") + sub_single.add_argument('--doi', + help="(optional) existing release DOI to match to") + sub_single.add_argument('--force-recrawl', + action='store_true', + help="ignore GWB history and use SPNv2 to re-crawl") + sub_single.add_argument('--no-spn2', + action='store_true', + help="don't use live web (SPNv2)") + sub_single.add_argument('--ingest-type', + default="pdf", + help="type of ingest (pdf, html, etc)") + sub_single.add_argument('--html-quick-mode', + action='store_true', + help="don't fetch individual sub-resources, just use CDX") + sub_single.add_argument('url', + help="URL of paper to fetch") + + sub_requests = subparsers.add_parser('requests', + help="takes a series of ingest requests (JSON, per line) and runs each") + sub_requests.add_argument('--no-spn2', + action='store_true', + help="don't use live web (SPNv2)") + sub_requests.add_argument('--html-quick-mode', + action='store_true', + help="don't fetch individual sub-resources, just use CDX") + sub_requests.set_defaults(func=run_requests) + sub_requests.add_argument('json_file', + help="JSON file (request per line) to import from (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_api = subparsers.add_parser('api', + help="starts a simple HTTP server that processes ingest requests") + sub_api.set_defaults(func=run_api) + sub_api.add_argument('--port', + help="HTTP port to listen on", + default=8033, type=int) + + args = parser.parse_args() + if not args.__dict__.get("func"): + parser.print_help(file=sys.stderr) + sys.exit(-1) + + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/sandcrawler/html_ingest.py b/python/sandcrawler/html_ingest.py deleted file mode 100644 index f11cac4..0000000 --- a/python/sandcrawler/html_ingest.py +++ /dev/null @@ -1,441 +0,0 @@ - -import io -import sys -import json -import datetime -import argparse -import xml.etree.ElementTree as ET -from typing import List, Optional, Any, Tuple - -import trafilatura -import pydantic -from selectolax.parser import HTMLParser - -from sandcrawler.ia import WaybackClient, CdxApiClient, ResourceResult, cdx_to_dict, fix_transfer_encoding, NoCaptureError, WaybackContentError -from sandcrawler.misc import gen_file_metadata, parse_cdx_datetime, datetime_to_cdx, clean_url, url_fuzzy_equal -from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules - - -TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" - -def html_extract_body_teixml(doc: bytes) -> dict: - try: - tei_xml = trafilatura.extract(doc, - tei_output=True, - include_comments=False, - include_formatting=True, - ) - except (ValueError, TypeError, Exception) as e: - return dict( - status="trafilatura-parse-error", - error_msg=str(e)[:1000], - ) - if tei_xml: - body_txt = teixml_body_text(tei_xml) - word_count = len(body_txt.split()) - return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count) - elif doc.startswith(b''): - # hack for firstmonday.org - return html_extract_body_teixml(doc[106:]) - else: - return dict(status="empty-xml", agent=TRAFILATURA_AGENT) - -def teixml_body_text(doc_xml: str) -> str: - ns = {"tei": "http://www.tei-c.org/ns/1.0"} - tree = ET.fromstring(doc_xml) - body = tree.find('.//tei:body', ns) - if body: - return " ".join(body.itertext()) - else: - return "" - -class WebResource(pydantic.BaseModel): - surt: str - timestamp: datetime.datetime - url: str - sha1hex: str - mimetype: str - status_code: int - size: Optional[int] - sha256hex: Optional[str] - resource_type: Optional[str] - - class Config: - json_encoders = { - datetime.datetime: lambda dt: dt.isoformat() - } - -class IngestWebResult(pydantic.BaseModel): - status: str - hit: bool - error_message: Optional[str] - cdx: Optional[dict] - terminal: Optional[Any] # TODO - request: Optional[Any] # TODO - file_meta: Optional[dict] - html_biblio: Optional[BiblioMetadata] - scope: Optional[str] - html_body: Optional[dict] - html_resources: Optional[List[WebResource]] - - class Config: - arbitrary_types_allowed = True - json_encoders = { - datetime.datetime: lambda dt: dt.isoformat(), - } - -class HtmlMetaRow(pydantic.BaseModel): - sha1hex: str - status: str - scope: Optional[str] - has_teixml: bool - has_thumbnail: bool - word_count: Optional[int] - biblio: Optional[dict] - resources: Optional[List[dict]] - - class Config: - arbitrary_types_allowed = True - json_encoders = { - datetime.datetime: lambda dt: dt.isoformat(), - } - - def to_sql_tuple(self) -> Tuple: - """ - This is for the html_meta SQL table. - """ - return ( - self.sha1hex, - datetime.datetime.now(), # updated - self.status, - self.scope, - self.has_teixml, - self.has_thumbnail, - self.word_count, - (self.biblio or None) and json.dumps(self.biblio, sort_keys=True), - (self.resources or None) and json.dumps(self.resources, sort_keys=True), - ) - - -def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]: - """ - This is the lazy version that just does a CDX lookup for each resource. - - Takes a list instead of single record because we may want to circuit break - on failure, and may introduce concurrency internal to this function. - """ - - full = [] - closest = when and datetime_to_cdx(when) - for resource in resources: - cdx_row = cdx_client.lookup_best(resource['url'], closest=closest) - if not cdx_row: - raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") - if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']): - print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr) - if not cdx_row.status_code: - # TODO: fall back to a full fetch? - print(f" WARN: skipping revisit record", file=sys.stderr) - continue - full.append(WebResource( - surt=cdx_row.surt, - timestamp=cdx_row.datetime, - url=cdx_row.url, - sha1hex=cdx_row.sha1hex, - mimetype=cdx_row.mimetype, - status_code=cdx_row.status_code, - size=None, - sha256hex=None, - resource_type=resource['type'], - )) - - return full - - -def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]: - """ - This is the full version which fetches each resource from wayback/petabox - and calculates additional hashes. - - Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version - """ - - full = [] - closest = when and datetime_to_cdx(when) - for resource in resources: - wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest) - if not wayback_resp or wayback_resp.status != 'success': - raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") - file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) - if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: - raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}") - full.append(WebResource( - surt=wayback_resp.cdx.surt, - timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), - url=wayback_resp.cdx.url, - sha1hex=file_meta['sha1hex'], - mimetype=file_meta['mimetype'], - status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, - size=file_meta['size_bytes'], - sha256hex=file_meta['sha256hex'], - resource_type=resource['type'], - )) - - return full - - -def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]: - - generator: Optional[str] = None - generator_elem = doc.css_first("meta[name='generator']") - if generator_elem: - generator = generator_elem.attrs['content'] - else: - generator_elem = doc.css_first("a[id='developedBy']") - if generator_elem: - generator = generator_elem.text() - if generator and "open journal systems 3" in generator.lower(): - return "ojs3" - elif generator and "open journal systems" in generator.lower(): - return "ojs" - elif generator and "plone" in generator.lower(): - return "plone" - elif generator and "wordpress" in generator.lower(): - return "wordpress" - elif generator and "blogger" in generator.lower(): - return "blogger" - elif doc.css_first("body[id='pkp-common-openJournalSystems']"): - return "ojs" - else: - try: - if 'powered by PKP OJS' in doc.html: - return "ojs" - if 'Powered by ' in doc.html: - return "arpha" - if "" in doc.html: - return "galenos" - except UnicodeDecodeError: - pass - - icon_elem = doc.css_first("link[type='image/x-icon']") - if icon_elem and 'href' in icon_elem.attrs: - if 'journalssystem.com' in icon_elem.attrs['href']: - return "journalssystem.com" - elif 'indexcopernicus.com' in icon_elem.attrs['href']: - return "indexcopernicus" - - if 'scielo' in url: - return "scielo" - - return None - -def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str: - """ - This function tries to guess if an HTML document represents one of: - - - article-fulltext - - article-abstract - - article-sample - - supplement - - component - - issue-fulltext - - landingpage - - homepage-domain - - blocked-paywall - - blocked-login - - blocked-captcha - - blocked-cookie - - errorpage - - stub - - other - - unknown - - Unknown implies the page could be anything. "other" implies it is not - fulltext or a landing page, but could be one of the other categories. - """ - - # assert that this is a real URL - assert url.count('/') >= 2 - - # basic paywall and loginwall detection based on URL - if url.endswith("/cookieAbsent"): - return "blocked-cookie" - if "://page-one.live.cf.public.springer.com" in url: - return "article-sample" - - if "scielo" in url: - if "sci_abstract" in url: - return "landingpage" - if "sci_arttext" in url: - return "article-fulltext" - - if "showcaptcha.asp" in url: - return "blocked-captcha" - - # is this the top-level URL of the domain? aka, no path? - if url.count('/') <= 2 or (url.count('/') == 3) and url.endswith('/'): - return "homepage-domain" - - platform = html_guess_platform(url, doc, biblio) - - if biblio: - if biblio.html_fulltext_url: - if url_fuzzy_equal(biblio.html_fulltext_url, url): - return "article-fulltext" - else: - return "landingpage" - - # platform-specific detection - if platform in ("ojs", "ojs3"): - - if biblio and biblio.title: - if word_count and word_count > 1200: - return "fulltext" - else: - return "landingpage" - else: - if "/article/view/" in url and word_count and word_count > 600: - return "fulltext" - return "other" - elif platform == "journalssystem.com": - if biblio and biblio.pdf_fulltext_url and word_count and word_count < 1000: - return "landingpage" - - # more platform/publisher specific checks - if "karger.com/Article/Abstract" in url: - return "landingpage" - if "dergipark.gov.tr" in url and not ("download/article-file" in url): - return "other" - - try: - if isinstance(doc.html, str) and "

403 Forbidden

" in doc.html: - # cloudflare block pattern - return "blocked-forbidden" - except UnicodeDecodeError: - pass - - print(f" scope guessing: platform {platform} word count: {word_count}", file=sys.stderr) - - # fallback: guess based on word count (arbitrary guesses here) - if word_count is not None: - if word_count < 20: - return "stub" - elif word_count > 500 and platform in ['wordpress', 'blogger']: - return "article-fulltext" - elif word_count > 1200: - return "article-fulltext" - - return "unknown" - - -def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult: - - adblock = load_adblock_rules() - wayback_client = WaybackClient() - - html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp) - if html_resource.status != "success": - return IngestWebResult( - status=html_resource.status, - hit=False, - cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), - ) - - assert html_resource.terminal_status_code == 200 - - file_meta = gen_file_metadata(html_resource.body) - file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource) - - if file_meta['mimetype'] not in ("text/html", "text/xml"): - return IngestWebResult( - status="wrong-mimetype", - hit=False, - cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), - file_meta=file_meta, - ) - - html_doc = HTMLParser(html_resource.body) - html_biblio = html_extract_biblio(url, html_doc) - html_body = html_extract_body_teixml(html_resource.body) - html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count')) - if html_scope not in ('article-fulltext', 'unknown'): - return IngestWebResult( - status="wrong-scope", - hit=False, - cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), - file_meta=file_meta, - html_biblio=html_biblio, - scope=html_scope, - ) - - raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock) - assert len(raw_resources) <= 200 - - when = parse_cdx_datetime(html_resource.cdx.datetime) - - full_resources: List[WebResource] = [] - if quick_mode: - full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when) - else: - full_resources = fetch_html_resources(raw_resources, wayback_client, when) - - output = IngestWebResult( - status="success", - hit=True, - cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), - file_meta=file_meta, - html_body=html_body, - html_biblio=html_biblio, - scope=html_scope, - html_resources=full_resources, - ) - return output - - -def main() -> None: - """ - Run this command like: - - python -m sandcrawler.html_ingest - """ - - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - subparsers = parser.add_subparsers() - - sub = subparsers.add_parser( - "single", help="tries to ingest a single URL, dumps result to stdout" - ) - sub.set_defaults(func="run_single") - sub.add_argument( - "url", - help="URL to fetch", - type=str, - ) - sub.add_argument( - "--timestamp", - help="timestamp for which to fetch document from wayback", - type=str, - ) - sub.add_argument( - "--quick-mode", - help="don't fetch resources, only do CDX lookup", - action="store_true", - ) - - args = parser.parse_args() - if not args.__dict__.get("func"): - parser.print_help(file=sys.stderr) - sys.exit(-1) - - if args.func == "run_single": - result = run_single(args.url, args.timestamp, args.quick_mode) - print(result.json(indent=2, exclude_none=True)) - else: - #func = getattr(wp, args.func) - #func() - raise NotImplementedError() - -if __name__ == "__main__": - main() diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py deleted file mode 100644 index b852c69..0000000 --- a/python/sandcrawler/ingest.py +++ /dev/null @@ -1,833 +0,0 @@ - -import sys -import json -import gzip -import time -import base64 -import xml.etree.ElementTree -from collections import namedtuple -from typing import Optional, Tuple, Any, Dict, List -from http.server import BaseHTTPRequestHandler, HTTPServer - -import requests -from selectolax.parser import HTMLParser - -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError -from sandcrawler.grobid import GrobidClient -from sandcrawler.pdfextract import process_pdf, PdfExtractResult -from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime -from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_ingest import fetch_html_resources, \ - quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ - WebResource, html_guess_platform -from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules -from sandcrawler.workers import SandcrawlerWorker -from sandcrawler.db import SandcrawlerPostgrestClient -from sandcrawler.xml import xml_reserialize - - -MAX_BODY_SIZE_BYTES = 128*1024*1024 - -class IngestFileWorker(SandcrawlerWorker): - """ - High level flow is to look in history first, then go to live web if - resource not found. Following redirects is treated as "fetching a - resource". Current version fetches a single resource; if it isn't a hit - but is an HTML 200, treats it as a landing page, tries to extract - fulltext link, then fetches that resource. - - process(request, key=None) -> response - Does all the things! - - Check existing processing (short circuit): - - check_existing_ingest(base_url) -> ingest_file_result or none - process_existing(result) -> response - try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() - - Fetch resource: - - find_resource(url) -> ResourceResult - - Process resource: - - process_hit(ResourceResult) -> response - process_grobid(ResourceResult) - """ - - def __init__(self, sink=None, **kwargs): - super().__init__() - - self.sink = sink - self.wayback_client = kwargs.get('wayback_client') - if not self.wayback_client: - self.wayback_client = WaybackClient() - self.spn_client = kwargs.get('spn_client') - if not self.spn_client: - self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) - self.grobid_client = kwargs.get('grobid_client') - if not self.grobid_client: - self.grobid_client = GrobidClient() - self.pgrest_client = kwargs.get('pgrest_client') - if not self.pgrest_client: - self.pgrest_client = SandcrawlerPostgrestClient() - self.grobid_sink = kwargs.get('grobid_sink') - self.thumbnail_sink = kwargs.get('thumbnail_sink') - self.pdftext_sink = kwargs.get('pdftext_sink') - self.xmldoc_sink = kwargs.get('xmldoc_sink') - self.htmlteixml_sink = kwargs.get('htmlteixml_sink') - self.max_hops = 6 - - self.try_existing_ingest = kwargs.get('try_existing_ingest', False) - self.try_existing_grobid = kwargs.get('try_existing_grobid', True) - self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) - self.try_wayback = kwargs.get('try_wayback', True) - self.try_spn2 = kwargs.get('try_spn2', True) - self.html_quick_mode = kwargs.get('html_quick_mode', False) - self.adblock_rules = load_adblock_rules() - self.max_html_resources = 200 - - self.base_url_blocklist = [ - # robot blocking - "://hkvalidate.perfdrive.com/", - - # temporary, until we implement specific fetch and 'petabox' output - "://archive.org/", - "://www.archive.org/", - "://web.archive.org/web/", - - # out of scope - "://openlibrary.org/", - "://www.openlibrary.org/", - "://fatcat.wiki/", - "://orcid.org/", - "://doaj.org/", - - # Domain squats - "://bartandjones.com", - "://ijretm.com", - "://ijrcemas.com", - "://jist.net.in", - "://croisements-revue.org", - - # all stubs/previews, not full papers - "://page-one.live.cf.public.springer.com", - - # large datasets-only (no PDF expected) - "plutof.ut.ee/", - "www.gbif.org/", - "doi.pangaea.de/", - "www.plate-archive.org/", - "://doi.org/10.25642/ipk/gbis/", - "://apex.ipk-gatersleben.de/", - "fao.org/glis/", - - # Historical non-paper content: - "dhz.uni-passau.de/", # newspapers - "digital.ucd.ie/", # ireland national historical - - # DOI prefixes - "doi.org/10.2307/", # JSTOR; slow and many redirects - "doi.org/10.18730/", # fao.org: database entry - "doi.org/10.15468/", # gbif.org: database entry - - # deprecated domain (doesn't redirect correctly) - "://edoc.mpg.de/", - ] - - self.wall_blocklist = [ - # loginwall - "://profile.thieme.de/HTML/sso/ejournals/login.htm", - "://login.bepress.com/", - "?SAMLRequest=", - "://osapublishing.org/captcha/", - "/password-login", - "://gateway.isiknowledge.com/", - "/login?TARGET=", - ] - - self.cookie_blocklist = [ - "/cookieAbsent", - "cookieSet=1", - "error=cookies_not_supported", - ] - - # these are special-case web domains for which we want SPN2 to not run - # a headless browser (brozzler), but instead simply run wget. - # the motivation could be to work around browser issues, or in the - # future possibly to increase download efficiency (wget/fetch being - # faster than browser fetch) - self.spn2_simple_get_domains = [ - # direct PDF links - "://arxiv.org/pdf/", - "://europepmc.org/backend/ptpmcrender.fcgi", - "://pdfs.semanticscholar.org/", - "://res.mdpi.com/", - - # platform sites - "://zenodo.org/", - "://figshare.org/", - "://springernature.figshare.com/", - - # popular simple cloud storage or direct links - "://s3-eu-west-1.amazonaws.com/", - ] - - self.src_valid_mimetypes = [ - "text/x-tex", - "application/gzip", - "application/x-bzip", - "application/x-bzip2", - "application/zip", - "application/x-tar", - "application/msword", - "application/vnd.openxmlformats-officedocument.wordprocessingml.document", - ] - - self.component_valid_mimetypes = [ - "image/jpeg", - "image/tiff", - "image/png", - "image/gif", - "audio/mpeg", - "video/mp4", - "video/mpeg", - "text/plain", - "text/csv", - "application/json", - "application/xml", - "application/pdf", - "application/gzip", - "application/x-bzip", - "application/x-bzip2", - "application/zip ", - "application/x-rar ", - "application/x-7z-compressed", - "application/x-tar", - "application/vnd.ms-powerpoint", - "application/vnd.ms-excel", - "application/msword", - "application/vnd.openxmlformats-officedocument.wordprocessingml.document", - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - ] - - - def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: - """ - Check in sandcrawler-db (postgres) to see if we have already ingested - this URL (ingest file result table). - - Returns existing row *if* found *and* we should use it, otherwise None. - - Looks at existing ingest results and makes a decision based on, eg, - status and timestamp. - """ - if not self.try_existing_ingest: - return None - existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) - # TODO: filter on more flags? - if existing and existing['hit'] == True: - return existing - else: - return None - - def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: - """ - Looks in wayback for a resource starting at the URL, following any - redirects. If a hit isn't found, try crawling with SPN. - """ - via = "none" - resource = None - - if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"): - raise NotImplementedError("handling direct wayback links not supported yet") - - if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): - raise NotImplementedError("fetching from archive.org not implemented yet") - - if self.try_wayback and not force_recrawl: - via = "wayback" - resource = self.wayback_client.lookup_resource(url, best_mimetype) - - # check for "soft 404" conditions, where we should retry with live SPNv2 - soft404 = False - # NOTE: these are often not working with SPNv2 either, so disabling. If - # we really want to try again, should do force-recrawl - #if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): - # soft404 = True - - old_failure = False - if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': - old_failure = True - - if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): - via = "spn2" - force_simple_get = 0 - for domain in self.spn2_simple_get_domains: - if domain in url: - force_simple_get = 1 - break - resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get) - print("[FETCH {:>6}] {} {}".format( - via, - (resource and resource.status), - (resource and resource.terminal_url) or url), - file=sys.stderr) - return resource - - def process_existing(self, request: dict, result_row: dict) -> dict: - """ - If we have an existing ingest file result, do any database fetches or - additional processing necessary to return a result. - """ - raise NotImplementedError("process_existing() not tested or safe yet") - assert result_row['hit'] - existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex']) - existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) - existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt']) - if not (existing_file_meta and existing_grobid and existing_cdx): - raise NotImplementedError("partially-exsiting records not implemented yet") - result = { - 'hit': result_row['hit'], - 'status': "existing", - 'request': request, - 'grobid': existing_grobid, - 'file_meta': existing_file_meta, - 'cdx': existing_cdx, - 'terminal': { - 'terminal_url': result_row['terminal_url'], - 'terminal_dt': result_row['terminal_dt'], - 'terminal_status_code': result_row['terminal_status_code'], - 'terminal_sha1hex': result_row['terminal_sha1hex'], - }, - } - return result - - def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: - """ - Run all the necessary processing for a new/fresh ingest hit. - """ - if ingest_type == "pdf": - return { - 'grobid': self.process_grobid(resource, file_meta), - 'pdf_meta': self.process_pdfextract(resource, file_meta), - } - elif ingest_type == "xml": - return { - 'xml_meta': self.process_xml(resource, file_meta), - } - elif ingest_type == "html": - html_info = self.process_html(resource, file_meta) - # if there is no html_biblio, don't clobber anything possibly extracted earlier - if 'html_biblio' in html_info and not html_info['html_biblio']: - html_info.pop('html_biblio') - return html_info - elif ingest_type == "src": - return {} - elif ingest_type == "component": - return {} - else: - raise NotImplementedError(f"process {ingest_type} hit") - - def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: - """ - Submits to resource body to GROBID for processing. - - TODO: By default checks sandcrawler-db for an existing row first, then - decide if we should re-process - """ - if self.try_existing_grobid: - existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) - if existing: - print("found existing GROBID result", file=sys.stderr) - return existing - - # Need to actually processes - result = self.grobid_client.process_fulltext(resource.body) - if self.grobid_sink: - # extra fields for GROBID kafka messages - result['file_meta'] = file_meta - result['key'] = result['file_meta']['sha1hex'] - self.grobid_sink.push_record(result.copy()) - if result['status'] == "success": - metadata = self.grobid_client.metadata(result) - if metadata: - result['metadata'] = self.grobid_client.metadata(result) - result['fatcat_release'] = result['metadata'].pop('fatcat_release', None) - result['grobid_version'] = result['metadata'].pop('grobid_version', None) - result.pop('tei_xml', None) - result.pop('file_meta', None) - result.pop('key', None) - return result - - def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: - """ - Extracts thumbnail and pdf_meta info from PDF. - - By default checks sandcrawler-db for an existing row first, then decide - if we should re-process. - - TODO: difference between Kafka schema and SQL/postgrest schema - """ - if self.try_existing_pdfextract: - existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex']) - if existing: - print("found existing pdf_meta result", file=sys.stderr) - result = PdfExtractResult.from_pdf_meta_dict(existing) - return result.to_pdftext_dict() - - # Need to actually processes - result = process_pdf(resource.body) - assert result.file_meta['sha1hex'] == file_meta['sha1hex'] - if self.thumbnail_sink and result.page0_thumbnail is not None: - self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) - if self.pdftext_sink: - self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) - result.page0_thumbnail = None - result.text = None - result.file_meta = None - return result.to_pdftext_dict() - - def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: - """ - Simply publishes to Kafka topic. - - In the future, could extract other metadata here (like body word - count), or attempting to fetch sub-resources. - """ - if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": - try: - jats_xml = xml_reserialize(resource.body) - except xml.etree.ElementTree.ParseError: - return dict(status="xml-parse-error") - msg = dict( - sha1hex=file_meta["sha1hex"], - status="success", - jats_xml=jats_xml, - ) - self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) - return dict(status="success") - - def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: - - assert resource.body - try: - html_doc = HTMLParser(resource.body) - except ValueError as ve: - return dict( - status="html-selectolax-error", - ) - html_biblio = html_extract_biblio(resource.terminal_url, html_doc) - assert html_biblio - html_body = html_extract_body_teixml(resource.body) - html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) - html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) - html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) - - if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'): - return dict( - status=html_scope, - html_biblio=html_biblio_dict, - scope=html_scope, - platform=html_platform, - ) - elif html_scope not in ('article-fulltext','unknown',): - html_body.pop("tei_xml", None) - return dict( - status="wrong-scope", - html_biblio=html_biblio_dict, - scope=html_scope, - platform=html_platform, - html_body=html_body, - ) - - raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) - if len(raw_resources) > self.max_html_resources: - html_body.pop("tei_xml", None) - return dict( - status="too-many-resources", - html_biblio=html_biblio_dict, - scope=html_scope, - platform=html_platform, - html_body=html_body, - ) - - if self.htmlteixml_sink and html_body['status'] == "success": - self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) - - html_body.pop("tei_xml", None) - - partial_result = dict( - html_biblio=html_biblio_dict, - scope=html_scope, - platform=html_platform, - html_body=html_body, - ) - - when = parse_cdx_datetime(resource.cdx.datetime) - full_resources: List[WebResource] = [] - - try: - if self.html_quick_mode: - print(" WARN: running quick CDX-only fetches", file=sys.stderr) - full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) - else: - full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) - except PetaboxError as e: - partial_result['status'] = 'petabox-error' - partial_result['error_message'] = str(e)[:1600] - return partial_result - except CdxApiError as e: - partial_result['status'] = 'cdx-error' - partial_result['error_message'] = str(e)[:1600] - return partial_result - except WaybackError as e: - partial_result['status'] = 'wayback-error' - partial_result['error_message'] = str(e)[:1600] - return partial_result - except WaybackContentError as e: - partial_result['status'] = 'wayback-content-error' - partial_result['error_message'] = str(e)[:1600] - return partial_result - except NoCaptureError as e: - partial_result['status'] = 'html-resource-no-capture' - partial_result['error_message'] = str(e)[:1600] - return partial_result - - info = dict( - html_body=html_body, - html_biblio=html_biblio_dict, - scope=html_scope, - platform=html_platform, - html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], - ) - if html_scope == 'unknown': - info['status'] = 'unknown-scope' - return info - - def timeout_response(self, task: dict) -> dict: - print("[TIMEOUT]", file=sys.stderr) - return dict( - request=task, - hit=False, - status="timeout", - error_message="ingest worker internal timeout", - ) - - def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html', 'src', 'component'): - return False - return True - - def process(self, request: dict, key: Any = None) -> dict: - - # old backwards compatibility - if request.get('ingest_type') == 'file': - request['ingest_type'] = 'pdf' - - ingest_type = request.get('ingest_type') - if ingest_type not in ("pdf", "xml", "html", "src", "component"): - raise NotImplementedError(f"can't handle ingest_type={ingest_type}") - - # parse/clean URL - # note that we pass through the original/raw URL, and that is what gets - # persisted in database table - base_url = clean_url(request['base_url']) - - force_recrawl = bool(request.get('force_recrawl', False)) - - for block in self.base_url_blocklist: - if block in base_url: - print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - return dict(request=request, hit=False, status="skip-url-blocklist") - - print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - - best_mimetype = None - if ingest_type == "pdf": - best_mimetype = "application/pdf" - elif ingest_type == "xml": - best_mimetype = "text/xml" - elif ingest_type == "html": - best_mimetype = "text/html" - elif ingest_type == "src": - best_mimetype = "application/gzip" - - existing = self.check_existing_ingest(ingest_type, base_url) - if existing: - return self.process_existing(request, existing) - - result: Dict[str, Any] = dict(request=request, hit=False) - - next_url = base_url - hops = [base_url] - - while len(hops) <= self.max_hops: - - result['hops'] = hops - - # check against blocklist again on each hop - for block in self.base_url_blocklist: - if block in next_url: - result['status'] = "skip-url-blocklist" - return result - - # check against known loginwall URLs - for block in self.wall_blocklist: - if block in next_url: - # TODO: blocked-wall instead of skip-wall - result['status'] = "skip-wall" - return result - - # check for popular cookie blocking URL patterns. On successful SPN - # crawls, shouldn't see these redirect URLs - for pattern in self.cookie_blocklist: - if pattern in next_url: - result['status'] = 'blocked-cookie' - return result - - try: - resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) - except SavePageNowError as e: - result['status'] = 'spn2-error' - result['error_message'] = str(e)[:1600] - return result - except PetaboxError as e: - result['status'] = 'petabox-error' - result['error_message'] = str(e)[:1600] - return result - except CdxApiError as e: - result['status'] = 'cdx-error' - result['error_message'] = str(e)[:1600] - # add a sleep in cdx-error path as a slow-down - time.sleep(2.0) - return result - except WaybackError as e: - result['status'] = 'wayback-error' - result['error_message'] = str(e)[:1600] - return result - except WaybackContentError as e: - result['status'] = 'wayback-content-error' - result['error_message'] = str(e)[:1600] - return result - except NotImplementedError as e: - result['status'] = 'not-implemented' - result['error_message'] = str(e)[:1600] - return result - - assert resource - - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) - - if not resource.hit: - result['status'] = resource.status - return result - - if resource.terminal_url: - for pattern in self.base_url_blocklist: - if pattern in resource.terminal_url: - result['status'] = 'skip-url-blocklist' - return result - - if resource.terminal_url: - for pattern in self.cookie_blocklist: - if pattern in resource.terminal_url: - result['status'] = 'blocked-cookie' - return result - - if not resource.body: - result['status'] = 'null-body' - return result - - if len(resource.body) > MAX_BODY_SIZE_BYTES: - result['status'] = 'body-too-large' - return result - - file_meta = gen_file_metadata(resource.body) - try: - file_meta, resource = fix_transfer_encoding(file_meta, resource) - except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) - return result - - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' - return result - - # here we split based on ingest type to try and extract a next hop - html_ish_resource = bool( - "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" - or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype'] - ) - html_biblio = None - html_doc = None - if html_ish_resource and resource.body: - try: - html_doc = HTMLParser(resource.body) - html_biblio = html_extract_biblio(resource.terminal_url, html_doc) - if html_biblio: - if not 'html_biblio' in result or html_biblio.title: - result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) - #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) - except ValueError: - pass - - if ingest_type == "pdf" and html_ish_resource: - - # the new style of URL extraction (already computed) - if html_biblio and html_biblio.pdf_fulltext_url: - fulltext_url = dict( - pdf_url=html_biblio.pdf_fulltext_url, - technique="html_biblio", - ) - else: - fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) - - result['extract_next_hop'] = fulltext_url - if not fulltext_url: - result['status'] = 'no-pdf-link' - return result - next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') or "" - assert next_url - next_url = clean_url(next_url) - print("[PARSE {:>6}] {} {}".format( - ingest_type, - fulltext_url.get('technique'), - next_url, - ), - file=sys.stderr) - if next_url in hops: - result['status'] = 'link-loop' - result['error_message'] = "repeated: {}".format(next_url) - return result - hops.append(next_url) - continue - elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio: - # NOTE: src_fulltext_url is not a thing - next_url_found = None - if ingest_type == "xml" and html_biblio.xml_fulltext_url: - next_url_found = html_biblio.xml_fulltext_url - elif ingest_type == "html" and html_biblio.html_fulltext_url: - next_url_found = html_biblio.html_fulltext_url - elif ingest_type == "component" and html_biblio.component_url: - next_url_found = html_biblio.component_url - - if next_url_found: - next_url = next_url_found - technique = "html_biblio" - print("[PARSE {:>6}] {} {}".format( - ingest_type, - technique, - next_url, - ), - file=sys.stderr) - if next_url in hops: - if ingest_type == "html": - # for HTML ingest, we don't count this as a link-loop - break - result['status'] = 'link-loop' - result['error_message'] = "repeated: {}".format(next_url) - return result - hops.append(next_url) - continue - - # default is to NOT keep hopping - break - - if len(hops) >= self.max_hops: - result['status'] = "max-hops-exceeded" - return result - - # fetch must be a hit if we got this far (though not necessarily an ingest hit!) - assert resource - assert resource.hit == True - assert resource.terminal_status_code in (200, 226) - - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - "terminal_sha1hex": file_meta['sha1hex'], - } - - result['file_meta'] = file_meta - result['cdx'] = cdx_to_dict(resource.cdx) - if resource.revisit_cdx: - result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) - - if ingest_type == "pdf": - if file_meta['mimetype'] != "application/pdf": - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" - return result - elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): - result['status'] = "wrong-mimetype" - return result - elif ingest_type == "html": - if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): - result['status'] = "wrong-mimetype" - return result - elif ingest_type == "src": - if file_meta['mimetype'] not in self.src_valid_mimetypes: - result['status'] = "wrong-mimetype" - return result - elif ingest_type == "component": - if file_meta['mimetype'] not in self.component_valid_mimetypes: - result['status'] = "wrong-mimetype" - return result - else: - raise NotImplementedError() - - info = self.process_hit(ingest_type, resource, file_meta) - result.update(info) - - # check if processing turned up an error - if info.get('status') not in ('success', None): - result['status'] = info['status'] - return result - - result['status'] = "success" - result['hit'] = True - if ingest_type == "pdf": - print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) - else: - print("[SUCCESS {:>5}] sha1:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - ), - file=sys.stderr) - return result - - -class IngestFileRequestHandler(BaseHTTPRequestHandler): - def do_POST(self): - if self.path != "/ingest": - self.send_response(404) - self.end_headers() - self.wfile.write("404: Not Found") - return - length = int(self.headers.get('content-length')) - request = json.loads(self.rfile.read(length).decode('utf-8')) - print("Got request: {}".format(request)) - ingester = IngestFileWorker() - result = ingester.process(request) - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps(result)) diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py new file mode 100644 index 0000000..b852c69 --- /dev/null +++ b/python/sandcrawler/ingest_file.py @@ -0,0 +1,833 @@ + +import sys +import json +import gzip +import time +import base64 +import xml.etree.ElementTree +from collections import namedtuple +from typing import Optional, Tuple, Any, Dict, List +from http.server import BaseHTTPRequestHandler, HTTPServer + +import requests +from selectolax.parser import HTMLParser + +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError +from sandcrawler.grobid import GrobidClient +from sandcrawler.pdfextract import process_pdf, PdfExtractResult +from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime +from sandcrawler.html import extract_fulltext_url +from sandcrawler.html_ingest import fetch_html_resources, \ + quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ + WebResource, html_guess_platform +from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgrestClient +from sandcrawler.xml import xml_reserialize + + +MAX_BODY_SIZE_BYTES = 128*1024*1024 + +class IngestFileWorker(SandcrawlerWorker): + """ + High level flow is to look in history first, then go to live web if + resource not found. Following redirects is treated as "fetching a + resource". Current version fetches a single resource; if it isn't a hit + but is an HTML 200, treats it as a landing page, tries to extract + fulltext link, then fetches that resource. + + process(request, key=None) -> response + Does all the things! + + Check existing processing (short circuit): + + check_existing_ingest(base_url) -> ingest_file_result or none + process_existing(result) -> response + try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() + + Fetch resource: + + find_resource(url) -> ResourceResult + + Process resource: + + process_hit(ResourceResult) -> response + process_grobid(ResourceResult) + """ + + def __init__(self, sink=None, **kwargs): + super().__init__() + + self.sink = sink + self.wayback_client = kwargs.get('wayback_client') + if not self.wayback_client: + self.wayback_client = WaybackClient() + self.spn_client = kwargs.get('spn_client') + if not self.spn_client: + self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + self.grobid_client = kwargs.get('grobid_client') + if not self.grobid_client: + self.grobid_client = GrobidClient() + self.pgrest_client = kwargs.get('pgrest_client') + if not self.pgrest_client: + self.pgrest_client = SandcrawlerPostgrestClient() + self.grobid_sink = kwargs.get('grobid_sink') + self.thumbnail_sink = kwargs.get('thumbnail_sink') + self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') + self.htmlteixml_sink = kwargs.get('htmlteixml_sink') + self.max_hops = 6 + + self.try_existing_ingest = kwargs.get('try_existing_ingest', False) + self.try_existing_grobid = kwargs.get('try_existing_grobid', True) + self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) + self.try_wayback = kwargs.get('try_wayback', True) + self.try_spn2 = kwargs.get('try_spn2', True) + self.html_quick_mode = kwargs.get('html_quick_mode', False) + self.adblock_rules = load_adblock_rules() + self.max_html_resources = 200 + + self.base_url_blocklist = [ + # robot blocking + "://hkvalidate.perfdrive.com/", + + # temporary, until we implement specific fetch and 'petabox' output + "://archive.org/", + "://www.archive.org/", + "://web.archive.org/web/", + + # out of scope + "://openlibrary.org/", + "://www.openlibrary.org/", + "://fatcat.wiki/", + "://orcid.org/", + "://doaj.org/", + + # Domain squats + "://bartandjones.com", + "://ijretm.com", + "://ijrcemas.com", + "://jist.net.in", + "://croisements-revue.org", + + # all stubs/previews, not full papers + "://page-one.live.cf.public.springer.com", + + # large datasets-only (no PDF expected) + "plutof.ut.ee/", + "www.gbif.org/", + "doi.pangaea.de/", + "www.plate-archive.org/", + "://doi.org/10.25642/ipk/gbis/", + "://apex.ipk-gatersleben.de/", + "fao.org/glis/", + + # Historical non-paper content: + "dhz.uni-passau.de/", # newspapers + "digital.ucd.ie/", # ireland national historical + + # DOI prefixes + "doi.org/10.2307/", # JSTOR; slow and many redirects + "doi.org/10.18730/", # fao.org: database entry + "doi.org/10.15468/", # gbif.org: database entry + + # deprecated domain (doesn't redirect correctly) + "://edoc.mpg.de/", + ] + + self.wall_blocklist = [ + # loginwall + "://profile.thieme.de/HTML/sso/ejournals/login.htm", + "://login.bepress.com/", + "?SAMLRequest=", + "://osapublishing.org/captcha/", + "/password-login", + "://gateway.isiknowledge.com/", + "/login?TARGET=", + ] + + self.cookie_blocklist = [ + "/cookieAbsent", + "cookieSet=1", + "error=cookies_not_supported", + ] + + # these are special-case web domains for which we want SPN2 to not run + # a headless browser (brozzler), but instead simply run wget. + # the motivation could be to work around browser issues, or in the + # future possibly to increase download efficiency (wget/fetch being + # faster than browser fetch) + self.spn2_simple_get_domains = [ + # direct PDF links + "://arxiv.org/pdf/", + "://europepmc.org/backend/ptpmcrender.fcgi", + "://pdfs.semanticscholar.org/", + "://res.mdpi.com/", + + # platform sites + "://zenodo.org/", + "://figshare.org/", + "://springernature.figshare.com/", + + # popular simple cloud storage or direct links + "://s3-eu-west-1.amazonaws.com/", + ] + + self.src_valid_mimetypes = [ + "text/x-tex", + "application/gzip", + "application/x-bzip", + "application/x-bzip2", + "application/zip", + "application/x-tar", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ] + + self.component_valid_mimetypes = [ + "image/jpeg", + "image/tiff", + "image/png", + "image/gif", + "audio/mpeg", + "video/mp4", + "video/mpeg", + "text/plain", + "text/csv", + "application/json", + "application/xml", + "application/pdf", + "application/gzip", + "application/x-bzip", + "application/x-bzip2", + "application/zip ", + "application/x-rar ", + "application/x-7z-compressed", + "application/x-tar", + "application/vnd.ms-powerpoint", + "application/vnd.ms-excel", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ] + + + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: + """ + Check in sandcrawler-db (postgres) to see if we have already ingested + this URL (ingest file result table). + + Returns existing row *if* found *and* we should use it, otherwise None. + + Looks at existing ingest results and makes a decision based on, eg, + status and timestamp. + """ + if not self.try_existing_ingest: + return None + existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) + # TODO: filter on more flags? + if existing and existing['hit'] == True: + return existing + else: + return None + + def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: + """ + Looks in wayback for a resource starting at the URL, following any + redirects. If a hit isn't found, try crawling with SPN. + """ + via = "none" + resource = None + + if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"): + raise NotImplementedError("handling direct wayback links not supported yet") + + if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): + raise NotImplementedError("fetching from archive.org not implemented yet") + + if self.try_wayback and not force_recrawl: + via = "wayback" + resource = self.wayback_client.lookup_resource(url, best_mimetype) + + # check for "soft 404" conditions, where we should retry with live SPNv2 + soft404 = False + # NOTE: these are often not working with SPNv2 either, so disabling. If + # we really want to try again, should do force-recrawl + #if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): + # soft404 = True + + old_failure = False + if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': + old_failure = True + + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): + via = "spn2" + force_simple_get = 0 + for domain in self.spn2_simple_get_domains: + if domain in url: + force_simple_get = 1 + break + resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get) + print("[FETCH {:>6}] {} {}".format( + via, + (resource and resource.status), + (resource and resource.terminal_url) or url), + file=sys.stderr) + return resource + + def process_existing(self, request: dict, result_row: dict) -> dict: + """ + If we have an existing ingest file result, do any database fetches or + additional processing necessary to return a result. + """ + raise NotImplementedError("process_existing() not tested or safe yet") + assert result_row['hit'] + existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex']) + existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) + existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt']) + if not (existing_file_meta and existing_grobid and existing_cdx): + raise NotImplementedError("partially-exsiting records not implemented yet") + result = { + 'hit': result_row['hit'], + 'status': "existing", + 'request': request, + 'grobid': existing_grobid, + 'file_meta': existing_file_meta, + 'cdx': existing_cdx, + 'terminal': { + 'terminal_url': result_row['terminal_url'], + 'terminal_dt': result_row['terminal_dt'], + 'terminal_status_code': result_row['terminal_status_code'], + 'terminal_sha1hex': result_row['terminal_sha1hex'], + }, + } + return result + + def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: + """ + Run all the necessary processing for a new/fresh ingest hit. + """ + if ingest_type == "pdf": + return { + 'grobid': self.process_grobid(resource, file_meta), + 'pdf_meta': self.process_pdfextract(resource, file_meta), + } + elif ingest_type == "xml": + return { + 'xml_meta': self.process_xml(resource, file_meta), + } + elif ingest_type == "html": + html_info = self.process_html(resource, file_meta) + # if there is no html_biblio, don't clobber anything possibly extracted earlier + if 'html_biblio' in html_info and not html_info['html_biblio']: + html_info.pop('html_biblio') + return html_info + elif ingest_type == "src": + return {} + elif ingest_type == "component": + return {} + else: + raise NotImplementedError(f"process {ingest_type} hit") + + def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Submits to resource body to GROBID for processing. + + TODO: By default checks sandcrawler-db for an existing row first, then + decide if we should re-process + """ + if self.try_existing_grobid: + existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) + if existing: + print("found existing GROBID result", file=sys.stderr) + return existing + + # Need to actually processes + result = self.grobid_client.process_fulltext(resource.body) + if self.grobid_sink: + # extra fields for GROBID kafka messages + result['file_meta'] = file_meta + result['key'] = result['file_meta']['sha1hex'] + self.grobid_sink.push_record(result.copy()) + if result['status'] == "success": + metadata = self.grobid_client.metadata(result) + if metadata: + result['metadata'] = self.grobid_client.metadata(result) + result['fatcat_release'] = result['metadata'].pop('fatcat_release', None) + result['grobid_version'] = result['metadata'].pop('grobid_version', None) + result.pop('tei_xml', None) + result.pop('file_meta', None) + result.pop('key', None) + return result + + def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Extracts thumbnail and pdf_meta info from PDF. + + By default checks sandcrawler-db for an existing row first, then decide + if we should re-process. + + TODO: difference between Kafka schema and SQL/postgrest schema + """ + if self.try_existing_pdfextract: + existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex']) + if existing: + print("found existing pdf_meta result", file=sys.stderr) + result = PdfExtractResult.from_pdf_meta_dict(existing) + return result.to_pdftext_dict() + + # Need to actually processes + result = process_pdf(resource.body) + assert result.file_meta['sha1hex'] == file_meta['sha1hex'] + if self.thumbnail_sink and result.page0_thumbnail is not None: + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) + if self.pdftext_sink: + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) + result.page0_thumbnail = None + result.text = None + result.file_meta = None + return result.to_pdftext_dict() + + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + try: + jats_xml = xml_reserialize(resource.body) + except xml.etree.ElementTree.ParseError: + return dict(status="xml-parse-error") + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=jats_xml, + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + + def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: + + assert resource.body + try: + html_doc = HTMLParser(resource.body) + except ValueError as ve: + return dict( + status="html-selectolax-error", + ) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + assert html_biblio + html_body = html_extract_body_teixml(resource.body) + html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) + html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) + html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) + + if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'): + return dict( + status=html_scope, + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + ) + elif html_scope not in ('article-fulltext','unknown',): + html_body.pop("tei_xml", None) + return dict( + status="wrong-scope", + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) + if len(raw_resources) > self.max_html_resources: + html_body.pop("tei_xml", None) + return dict( + status="too-many-resources", + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + if self.htmlteixml_sink and html_body['status'] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) + + html_body.pop("tei_xml", None) + + partial_result = dict( + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + when = parse_cdx_datetime(resource.cdx.datetime) + full_resources: List[WebResource] = [] + + try: + if self.html_quick_mode: + print(" WARN: running quick CDX-only fetches", file=sys.stderr) + full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) + except PetaboxError as e: + partial_result['status'] = 'petabox-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except CdxApiError as e: + partial_result['status'] = 'cdx-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackError as e: + partial_result['status'] = 'wayback-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackContentError as e: + partial_result['status'] = 'wayback-content-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except NoCaptureError as e: + partial_result['status'] = 'html-resource-no-capture' + partial_result['error_message'] = str(e)[:1600] + return partial_result + + info = dict( + html_body=html_body, + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], + ) + if html_scope == 'unknown': + info['status'] = 'unknown-scope' + return info + + def timeout_response(self, task: dict) -> dict: + print("[TIMEOUT]", file=sys.stderr) + return dict( + request=task, + hit=False, + status="timeout", + error_message="ingest worker internal timeout", + ) + + def want(self, request: dict) -> bool: + if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html', 'src', 'component'): + return False + return True + + def process(self, request: dict, key: Any = None) -> dict: + + # old backwards compatibility + if request.get('ingest_type') == 'file': + request['ingest_type'] = 'pdf' + + ingest_type = request.get('ingest_type') + if ingest_type not in ("pdf", "xml", "html", "src", "component"): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + + # parse/clean URL + # note that we pass through the original/raw URL, and that is what gets + # persisted in database table + base_url = clean_url(request['base_url']) + + force_recrawl = bool(request.get('force_recrawl', False)) + + for block in self.base_url_blocklist: + if block in base_url: + print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + return dict(request=request, hit=False, status="skip-url-blocklist") + + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + + best_mimetype = None + if ingest_type == "pdf": + best_mimetype = "application/pdf" + elif ingest_type == "xml": + best_mimetype = "text/xml" + elif ingest_type == "html": + best_mimetype = "text/html" + elif ingest_type == "src": + best_mimetype = "application/gzip" + + existing = self.check_existing_ingest(ingest_type, base_url) + if existing: + return self.process_existing(request, existing) + + result: Dict[str, Any] = dict(request=request, hit=False) + + next_url = base_url + hops = [base_url] + + while len(hops) <= self.max_hops: + + result['hops'] = hops + + # check against blocklist again on each hop + for block in self.base_url_blocklist: + if block in next_url: + result['status'] = "skip-url-blocklist" + return result + + # check against known loginwall URLs + for block in self.wall_blocklist: + if block in next_url: + # TODO: blocked-wall instead of skip-wall + result['status'] = "skip-wall" + return result + + # check for popular cookie blocking URL patterns. On successful SPN + # crawls, shouldn't see these redirect URLs + for pattern in self.cookie_blocklist: + if pattern in next_url: + result['status'] = 'blocked-cookie' + return result + + try: + resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) + except SavePageNowError as e: + result['status'] = 'spn2-error' + result['error_message'] = str(e)[:1600] + return result + except PetaboxError as e: + result['status'] = 'petabox-error' + result['error_message'] = str(e)[:1600] + return result + except CdxApiError as e: + result['status'] = 'cdx-error' + result['error_message'] = str(e)[:1600] + # add a sleep in cdx-error path as a slow-down + time.sleep(2.0) + return result + except WaybackError as e: + result['status'] = 'wayback-error' + result['error_message'] = str(e)[:1600] + return result + except WaybackContentError as e: + result['status'] = 'wayback-content-error' + result['error_message'] = str(e)[:1600] + return result + except NotImplementedError as e: + result['status'] = 'not-implemented' + result['error_message'] = str(e)[:1600] + return result + + assert resource + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + if resource.terminal_url not in result['hops']: + result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status + return result + + if resource.terminal_url: + for pattern in self.base_url_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'skip-url-blocklist' + return result + + if resource.terminal_url: + for pattern in self.cookie_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'blocked-cookie' + return result + + if not resource.body: + result['status'] = 'null-body' + return result + + if len(resource.body) > MAX_BODY_SIZE_BYTES: + result['status'] = 'body-too-large' + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) + return result + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + html_biblio = None + html_doc = None + if html_ish_resource and resource.body: + try: + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + if html_biblio: + if not 'html_biblio' in result or html_biblio.title: + result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + except ValueError: + pass + + if ingest_type == "pdf" and html_ish_resource: + + # the new style of URL extraction (already computed) + if html_biblio and html_biblio.pdf_fulltext_url: + fulltext_url = dict( + pdf_url=html_biblio.pdf_fulltext_url, + technique="html_biblio", + ) + else: + fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) + + result['extract_next_hop'] = fulltext_url + if not fulltext_url: + result['status'] = 'no-pdf-link' + return result + next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') or "" + assert next_url + next_url = clean_url(next_url) + print("[PARSE {:>6}] {} {}".format( + ingest_type, + fulltext_url.get('technique'), + next_url, + ), + file=sys.stderr) + if next_url in hops: + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio: + # NOTE: src_fulltext_url is not a thing + next_url_found = None + if ingest_type == "xml" and html_biblio.xml_fulltext_url: + next_url_found = html_biblio.xml_fulltext_url + elif ingest_type == "html" and html_biblio.html_fulltext_url: + next_url_found = html_biblio.html_fulltext_url + elif ingest_type == "component" and html_biblio.component_url: + next_url_found = html_biblio.component_url + + if next_url_found: + next_url = next_url_found + technique = "html_biblio" + print("[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr) + if next_url in hops: + if ingest_type == "html": + # for HTML ingest, we don't count this as a link-loop + break + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + + # default is to NOT keep hopping + break + + if len(hops) >= self.max_hops: + result['status'] = "max-hops-exceeded" + return result + + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + "terminal_sha1hex": file_meta['sha1hex'], + } + + result['file_meta'] = file_meta + result['cdx'] = cdx_to_dict(resource.cdx) + if resource.revisit_cdx: + result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + + if ingest_type == "pdf": + if file_meta['mimetype'] != "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "src": + if file_meta['mimetype'] not in self.src_valid_mimetypes: + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "component": + if file_meta['mimetype'] not in self.component_valid_mimetypes: + result['status'] = "wrong-mimetype" + return result + else: + raise NotImplementedError() + + info = self.process_hit(ingest_type, resource, file_meta) + result.update(info) + + # check if processing turned up an error + if info.get('status') not in ('success', None): + result['status'] = info['status'] + return result + + result['status'] = "success" + result['hit'] = True + if ingest_type == "pdf": + print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) + else: + print("[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) + return result + + +class IngestFileRequestHandler(BaseHTTPRequestHandler): + def do_POST(self): + if self.path != "/ingest": + self.send_response(404) + self.end_headers() + self.wfile.write("404: Not Found") + return + length = int(self.headers.get('content-length')) + request = json.loads(self.rfile.read(length).decode('utf-8')) + print("Got request: {}".format(request)) + ingester = IngestFileWorker() + result = ingester.process(request) + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(result)) diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py new file mode 100644 index 0000000..f11cac4 --- /dev/null +++ b/python/sandcrawler/ingest_html.py @@ -0,0 +1,441 @@ + +import io +import sys +import json +import datetime +import argparse +import xml.etree.ElementTree as ET +from typing import List, Optional, Any, Tuple + +import trafilatura +import pydantic +from selectolax.parser import HTMLParser + +from sandcrawler.ia import WaybackClient, CdxApiClient, ResourceResult, cdx_to_dict, fix_transfer_encoding, NoCaptureError, WaybackContentError +from sandcrawler.misc import gen_file_metadata, parse_cdx_datetime, datetime_to_cdx, clean_url, url_fuzzy_equal +from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules + + +TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" + +def html_extract_body_teixml(doc: bytes) -> dict: + try: + tei_xml = trafilatura.extract(doc, + tei_output=True, + include_comments=False, + include_formatting=True, + ) + except (ValueError, TypeError, Exception) as e: + return dict( + status="trafilatura-parse-error", + error_msg=str(e)[:1000], + ) + if tei_xml: + body_txt = teixml_body_text(tei_xml) + word_count = len(body_txt.split()) + return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count) + elif doc.startswith(b''): + # hack for firstmonday.org + return html_extract_body_teixml(doc[106:]) + else: + return dict(status="empty-xml", agent=TRAFILATURA_AGENT) + +def teixml_body_text(doc_xml: str) -> str: + ns = {"tei": "http://www.tei-c.org/ns/1.0"} + tree = ET.fromstring(doc_xml) + body = tree.find('.//tei:body', ns) + if body: + return " ".join(body.itertext()) + else: + return "" + +class WebResource(pydantic.BaseModel): + surt: str + timestamp: datetime.datetime + url: str + sha1hex: str + mimetype: str + status_code: int + size: Optional[int] + sha256hex: Optional[str] + resource_type: Optional[str] + + class Config: + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat() + } + +class IngestWebResult(pydantic.BaseModel): + status: str + hit: bool + error_message: Optional[str] + cdx: Optional[dict] + terminal: Optional[Any] # TODO + request: Optional[Any] # TODO + file_meta: Optional[dict] + html_biblio: Optional[BiblioMetadata] + scope: Optional[str] + html_body: Optional[dict] + html_resources: Optional[List[WebResource]] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat(), + } + +class HtmlMetaRow(pydantic.BaseModel): + sha1hex: str + status: str + scope: Optional[str] + has_teixml: bool + has_thumbnail: bool + word_count: Optional[int] + biblio: Optional[dict] + resources: Optional[List[dict]] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat(), + } + + def to_sql_tuple(self) -> Tuple: + """ + This is for the html_meta SQL table. + """ + return ( + self.sha1hex, + datetime.datetime.now(), # updated + self.status, + self.scope, + self.has_teixml, + self.has_thumbnail, + self.word_count, + (self.biblio or None) and json.dumps(self.biblio, sort_keys=True), + (self.resources or None) and json.dumps(self.resources, sort_keys=True), + ) + + +def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]: + """ + This is the lazy version that just does a CDX lookup for each resource. + + Takes a list instead of single record because we may want to circuit break + on failure, and may introduce concurrency internal to this function. + """ + + full = [] + closest = when and datetime_to_cdx(when) + for resource in resources: + cdx_row = cdx_client.lookup_best(resource['url'], closest=closest) + if not cdx_row: + raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") + if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']): + print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr) + if not cdx_row.status_code: + # TODO: fall back to a full fetch? + print(f" WARN: skipping revisit record", file=sys.stderr) + continue + full.append(WebResource( + surt=cdx_row.surt, + timestamp=cdx_row.datetime, + url=cdx_row.url, + sha1hex=cdx_row.sha1hex, + mimetype=cdx_row.mimetype, + status_code=cdx_row.status_code, + size=None, + sha256hex=None, + resource_type=resource['type'], + )) + + return full + + +def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]: + """ + This is the full version which fetches each resource from wayback/petabox + and calculates additional hashes. + + Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version + """ + + full = [] + closest = when and datetime_to_cdx(when) + for resource in resources: + wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest) + if not wayback_resp or wayback_resp.status != 'success': + raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") + file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) + if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: + raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}") + full.append(WebResource( + surt=wayback_resp.cdx.surt, + timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), + url=wayback_resp.cdx.url, + sha1hex=file_meta['sha1hex'], + mimetype=file_meta['mimetype'], + status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, + size=file_meta['size_bytes'], + sha256hex=file_meta['sha256hex'], + resource_type=resource['type'], + )) + + return full + + +def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]: + + generator: Optional[str] = None + generator_elem = doc.css_first("meta[name='generator']") + if generator_elem: + generator = generator_elem.attrs['content'] + else: + generator_elem = doc.css_first("a[id='developedBy']") + if generator_elem: + generator = generator_elem.text() + if generator and "open journal systems 3" in generator.lower(): + return "ojs3" + elif generator and "open journal systems" in generator.lower(): + return "ojs" + elif generator and "plone" in generator.lower(): + return "plone" + elif generator and "wordpress" in generator.lower(): + return "wordpress" + elif generator and "blogger" in generator.lower(): + return "blogger" + elif doc.css_first("body[id='pkp-common-openJournalSystems']"): + return "ojs" + else: + try: + if 'powered by
PKP OJS' in doc.html: + return "ojs" + if 'Powered by ' in doc.html: + return "arpha" + if "" in doc.html: + return "galenos" + except UnicodeDecodeError: + pass + + icon_elem = doc.css_first("link[type='image/x-icon']") + if icon_elem and 'href' in icon_elem.attrs: + if 'journalssystem.com' in icon_elem.attrs['href']: + return "journalssystem.com" + elif 'indexcopernicus.com' in icon_elem.attrs['href']: + return "indexcopernicus" + + if 'scielo' in url: + return "scielo" + + return None + +def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str: + """ + This function tries to guess if an HTML document represents one of: + + - article-fulltext + - article-abstract + - article-sample + - supplement + - component + - issue-fulltext + - landingpage + - homepage-domain + - blocked-paywall + - blocked-login + - blocked-captcha + - blocked-cookie + - errorpage + - stub + - other + - unknown + + Unknown implies the page could be anything. "other" implies it is not + fulltext or a landing page, but could be one of the other categories. + """ + + # assert that this is a real URL + assert url.count('/') >= 2 + + # basic paywall and loginwall detection based on URL + if url.endswith("/cookieAbsent"): + return "blocked-cookie" + if "://page-one.live.cf.public.springer.com" in url: + return "article-sample" + + if "scielo" in url: + if "sci_abstract" in url: + return "landingpage" + if "sci_arttext" in url: + return "article-fulltext" + + if "showcaptcha.asp" in url: + return "blocked-captcha" + + # is this the top-level URL of the domain? aka, no path? + if url.count('/') <= 2 or (url.count('/') == 3) and url.endswith('/'): + return "homepage-domain" + + platform = html_guess_platform(url, doc, biblio) + + if biblio: + if biblio.html_fulltext_url: + if url_fuzzy_equal(biblio.html_fulltext_url, url): + return "article-fulltext" + else: + return "landingpage" + + # platform-specific detection + if platform in ("ojs", "ojs3"): + + if biblio and biblio.title: + if word_count and word_count > 1200: + return "fulltext" + else: + return "landingpage" + else: + if "/article/view/" in url and word_count and word_count > 600: + return "fulltext" + return "other" + elif platform == "journalssystem.com": + if biblio and biblio.pdf_fulltext_url and word_count and word_count < 1000: + return "landingpage" + + # more platform/publisher specific checks + if "karger.com/Article/Abstract" in url: + return "landingpage" + if "dergipark.gov.tr" in url and not ("download/article-file" in url): + return "other" + + try: + if isinstance(doc.html, str) and "

403 Forbidden

" in doc.html: + # cloudflare block pattern + return "blocked-forbidden" + except UnicodeDecodeError: + pass + + print(f" scope guessing: platform {platform} word count: {word_count}", file=sys.stderr) + + # fallback: guess based on word count (arbitrary guesses here) + if word_count is not None: + if word_count < 20: + return "stub" + elif word_count > 500 and platform in ['wordpress', 'blogger']: + return "article-fulltext" + elif word_count > 1200: + return "article-fulltext" + + return "unknown" + + +def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult: + + adblock = load_adblock_rules() + wayback_client = WaybackClient() + + html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp) + if html_resource.status != "success": + return IngestWebResult( + status=html_resource.status, + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + ) + + assert html_resource.terminal_status_code == 200 + + file_meta = gen_file_metadata(html_resource.body) + file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource) + + if file_meta['mimetype'] not in ("text/html", "text/xml"): + return IngestWebResult( + status="wrong-mimetype", + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + ) + + html_doc = HTMLParser(html_resource.body) + html_biblio = html_extract_biblio(url, html_doc) + html_body = html_extract_body_teixml(html_resource.body) + html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count')) + if html_scope not in ('article-fulltext', 'unknown'): + return IngestWebResult( + status="wrong-scope", + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + html_biblio=html_biblio, + scope=html_scope, + ) + + raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock) + assert len(raw_resources) <= 200 + + when = parse_cdx_datetime(html_resource.cdx.datetime) + + full_resources: List[WebResource] = [] + if quick_mode: + full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, wayback_client, when) + + output = IngestWebResult( + status="success", + hit=True, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + html_body=html_body, + html_biblio=html_biblio, + scope=html_scope, + html_resources=full_resources, + ) + return output + + +def main() -> None: + """ + Run this command like: + + python -m sandcrawler.html_ingest + """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + subparsers = parser.add_subparsers() + + sub = subparsers.add_parser( + "single", help="tries to ingest a single URL, dumps result to stdout" + ) + sub.set_defaults(func="run_single") + sub.add_argument( + "url", + help="URL to fetch", + type=str, + ) + sub.add_argument( + "--timestamp", + help="timestamp for which to fetch document from wayback", + type=str, + ) + sub.add_argument( + "--quick-mode", + help="don't fetch resources, only do CDX lookup", + action="store_true", + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + parser.print_help(file=sys.stderr) + sys.exit(-1) + + if args.func == "run_single": + result = run_single(args.url, args.timestamp, args.quick_mode) + print(result.json(indent=2, exclude_none=True)) + else: + #func = getattr(wp, args.func) + #func() + raise NotImplementedError() + +if __name__ == "__main__": + main() -- cgit v1.2.3