import argparse import datetime import json import sys import xml.etree.ElementTree as ET from typing import Any, List, Optional, Tuple import pydantic import trafilatura from selectolax.parser import HTMLParser from sandcrawler.html_metadata import ( BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules, ) from sandcrawler.ia import ( CdxApiClient, NoCaptureError, WaybackClient, WaybackContentError, cdx_to_dict, fix_transfer_encoding, ) from sandcrawler.misc import ( datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, url_fuzzy_equal, ) TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" def html_extract_body_teixml(doc: bytes) -> dict: try: tei_xml = trafilatura.extract( doc, output_format="xmltei", include_comments=False, include_formatting=True, ) except (ValueError, TypeError, Exception) as e: return dict( status="trafilatura-parse-error", error_msg=str(e)[:1000], ) if tei_xml: body_txt = teixml_body_text(tei_xml) word_count = len(body_txt.split()) return dict( status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count ) elif doc.startswith( b'' ): # hack for firstmonday.org return html_extract_body_teixml(doc[106:]) else: return dict(status="empty-xml", agent=TRAFILATURA_AGENT) def teixml_body_text(doc_xml: str) -> str: ns = {"tei": "http://www.tei-c.org/ns/1.0"} tree = ET.fromstring(doc_xml) body = tree.find(".//tei:body", ns) if body: return " ".join(body.itertext()) else: return "" class WebResource(pydantic.BaseModel): surt: str timestamp: datetime.datetime url: str sha1hex: str mimetype: str status_code: int size: Optional[int] sha256hex: Optional[str] resource_type: Optional[str] class Config: json_encoders = {datetime.datetime: lambda dt: dt.isoformat()} class IngestWebResult(pydantic.BaseModel): status: str hit: bool error_message: Optional[str] cdx: Optional[dict] terminal: Optional[Any] # TODO request: Optional[Any] # TODO file_meta: Optional[dict] html_biblio: Optional[BiblioMetadata] scope: Optional[str] html_body: Optional[dict] html_resources: Optional[List[WebResource]] class Config: arbitrary_types_allowed = True json_encoders = { datetime.datetime: lambda dt: dt.isoformat(), } class HtmlMetaRow(pydantic.BaseModel): sha1hex: str status: str scope: Optional[str] has_teixml: bool has_thumbnail: bool word_count: Optional[int] biblio: Optional[dict] resources: Optional[List[dict]] class Config: arbitrary_types_allowed = True json_encoders = { datetime.datetime: lambda dt: dt.isoformat(), } def to_sql_tuple(self) -> Tuple: """ This is for the html_meta SQL table. """ return ( self.sha1hex, datetime.datetime.now(), # updated self.status, self.scope, self.has_teixml, self.has_thumbnail, self.word_count, (self.biblio or None) and json.dumps(self.biblio, sort_keys=True), (self.resources or None) and json.dumps(self.resources, sort_keys=True), ) def quick_fetch_html_resources( resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime] ) -> List[WebResource]: """ This is the lazy version that just does a CDX lookup for each resource. Takes a list instead of single record because we may want to circuit break on failure, and may introduce concurrency internal to this function. """ full = [] closest = when and datetime_to_cdx(when) for resource in resources: cdx_row = cdx_client.lookup_best(resource["url"], closest=closest) if not cdx_row: raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") if cdx_row.url != resource["url"] and not url_fuzzy_equal(cdx_row.url, resource["url"]): print( f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr ) if not cdx_row.status_code: # TODO: fall back to a full fetch? print(" WARN: skipping revisit record", file=sys.stderr) continue full.append( WebResource( surt=cdx_row.surt, timestamp=cdx_row.datetime, url=cdx_row.url, sha1hex=cdx_row.sha1hex, mimetype=cdx_row.mimetype, status_code=cdx_row.status_code, size=None, sha256hex=None, resource_type=resource["type"], ) ) return full def fetch_html_resources( resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime] ) -> List[WebResource]: """ This is the full version which fetches each resource from wayback/petabox and calculates additional hashes. Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version """ full = [] closest = when and datetime_to_cdx(when) for resource in resources: wayback_resp = wayback_client.lookup_resource(resource["url"], closest=closest) if not wayback_resp or wayback_resp.status != "success": raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") # for HTML sub-resources specifically, we allow the CDX SHA1 to match # either the transfer-encoded or inner (un-encoded) payload body to # match. This is because of an ambiguity in the WARC specification outer_file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) try: file_meta, wayback_resp = fix_transfer_encoding(outer_file_meta, wayback_resp) except Exception as e: raise WaybackContentError(f"bad gzip encoding: {e}") if ( file_meta["sha1hex"] != wayback_resp.cdx.sha1hex and outer_file_meta["sha1hex"] != wayback_resp.cdx.sha1hex ): raise WaybackContentError( f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url} found:{file_meta['sha1hex']} expected:{wayback_resp.cdx.sha1hex}" ) full.append( WebResource( surt=wayback_resp.cdx.surt, timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), url=wayback_resp.cdx.url, sha1hex=file_meta["sha1hex"], mimetype=file_meta["mimetype"], status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, size=file_meta["size_bytes"], sha256hex=file_meta["sha256hex"], resource_type=resource["type"], ) ) return full def html_guess_platform( url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] ) -> Optional[str]: generator: Optional[str] = None generator_elem = doc.css_first("meta[name='generator']") if generator_elem: generator = generator_elem.attrs["content"] else: generator_elem = doc.css_first("a[id='developedBy']") if generator_elem: generator = generator_elem.text() if generator and "open journal systems 3" in generator.lower(): return "ojs3" elif generator and "open journal systems" in generator.lower(): return "ojs" elif generator and "plone" in generator.lower(): return "plone" elif generator and "wordpress" in generator.lower(): return "wordpress" elif generator and "blogger" in generator.lower(): return "blogger" elif doc.css_first("body[id='pkp-common-openJournalSystems']"): return "ojs" else: try: if ( 'powered by PKP OJS' in doc.html ): return "ojs" if 'Published with' in doc.html: return "pubpub" if 'Powered by ' in doc.html: return "arpha" if "" in doc.html: return "galenos" except UnicodeDecodeError: pass icon_elem = doc.css_first("link[type='image/x-icon']") if icon_elem and "href" in icon_elem.attrs: if "journalssystem.com" in icon_elem.attrs["href"]: return "journalssystem.com" elif "indexcopernicus.com" in icon_elem.attrs["href"]: return "indexcopernicus" if "scielo" in url: return "scielo" return None def html_guess_scope( url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int] ) -> str: """ This function tries to guess if an HTML document represents one of: - article-fulltext - article-abstract - article-sample - supplement - component - issue-fulltext - landingpage - homepage-domain - blocked-paywall - blocked-login - blocked-captcha - blocked-cookie - errorpage - stub - other - unknown Unknown implies the page could be anything. "other" implies it is not fulltext or a landing page, but could be one of the other categories. """ # assert that this is a real URL assert url.count("/") >= 2 # basic paywall and loginwall detection based on URL if url.endswith("/cookieAbsent"): return "blocked-cookie" if "://page-one.live.cf.public.springer.com" in url: return "article-sample" if "scielo" in url: if "sci_abstract" in url: return "landingpage" if "sci_arttext" in url: return "article-fulltext" if "showcaptcha.asp" in url: return "blocked-captcha" # is this the top-level URL of the domain? aka, no path? if url.count("/") <= 2 or (url.count("/") == 3) and url.endswith("/"): return "homepage-domain" platform = html_guess_platform(url, doc, biblio) if biblio: if biblio.html_fulltext_url: if url_fuzzy_equal(biblio.html_fulltext_url, url): return "article-fulltext" else: return "landingpage" # platform-specific detection if platform in ("ojs", "ojs3"): if biblio and biblio.title: if word_count and word_count > 1200: return "fulltext" else: return "landingpage" else: if "/article/view/" in url and word_count and word_count > 600: return "fulltext" return "other" elif platform == "journalssystem.com": if biblio and biblio.pdf_fulltext_url and word_count and word_count < 1000: return "landingpage" # more platform/publisher specific checks if "karger.com/Article/Abstract" in url: return "landingpage" if "dergipark.gov.tr" in url and not ("download/article-file" in url): return "other" try: if isinstance(doc.html, str) and "

403 Forbidden

" in doc.html: # cloudflare block pattern return "blocked-forbidden" except UnicodeDecodeError: pass print(f" scope guessing: platform {platform} word count: {word_count}", file=sys.stderr) # fallback: guess based on word count (arbitrary guesses here) if word_count is not None: if word_count < 20: return "stub" elif word_count > 500 and platform in ["wordpress", "blogger"]: return "article-fulltext" elif word_count > 1200: return "article-fulltext" return "unknown" def run_single( url: str, timestamp: Optional[str] = None, quick_mode: bool = False ) -> IngestWebResult: adblock = load_adblock_rules() wayback_client = WaybackClient() html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp) if html_resource.status != "success": return IngestWebResult( status=html_resource.status, hit=False, cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), ) assert html_resource.terminal_status_code == 200 file_meta = gen_file_metadata(html_resource.body) file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource) if file_meta["mimetype"] not in ("text/html", "text/xml"): return IngestWebResult( status="wrong-mimetype", hit=False, cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), file_meta=file_meta, ) html_doc = HTMLParser(html_resource.body) html_biblio = html_extract_biblio(url, html_doc) html_body = html_extract_body_teixml(html_resource.body) html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get("word_count")) if html_scope not in ("article-fulltext", "unknown"): return IngestWebResult( status="wrong-scope", hit=False, cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), file_meta=file_meta, html_biblio=html_biblio, scope=html_scope, ) raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock) assert len(raw_resources) <= 200 when = parse_cdx_datetime(html_resource.cdx.datetime) full_resources: List[WebResource] = [] if quick_mode: full_resources = quick_fetch_html_resources( raw_resources, wayback_client.cdx_client, when ) else: full_resources = fetch_html_resources(raw_resources, wayback_client, when) output = IngestWebResult( status="success", hit=True, cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), file_meta=file_meta, html_body=html_body, html_biblio=html_biblio, scope=html_scope, html_resources=full_resources, ) return output def main() -> None: """ Run this command like: python -m sandcrawler.ingest_html """ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers() sub = subparsers.add_parser( "single", help="tries to ingest a single URL, dumps result to stdout" ) sub.set_defaults(func="run_single") sub.add_argument( "url", help="URL to fetch", type=str, ) sub.add_argument( "--timestamp", help="timestamp for which to fetch document from wayback", type=str, ) sub.add_argument( "--quick-mode", help="don't fetch resources, only do CDX lookup", action="store_true", ) args = parser.parse_args() if not args.__dict__.get("func"): parser.print_help(file=sys.stderr) sys.exit(-1) if args.func == "run_single": result = run_single(args.url, args.timestamp, args.quick_mode) print(result.json(indent=2, exclude_none=True)) else: # func = getattr(wp, args.func) # func() raise NotImplementedError() if __name__ == "__main__": main()