import argparse
import datetime
import json
import sys
import xml.etree.ElementTree as ET
from typing import Any, List, Optional, Tuple
import pydantic
import trafilatura
from selectolax.parser import HTMLParser
from sandcrawler.html_metadata import (
BiblioMetadata,
html_extract_biblio,
html_extract_resources,
load_adblock_rules,
)
from sandcrawler.ia import (
CdxApiClient,
NoCaptureError,
WaybackClient,
WaybackContentError,
cdx_to_dict,
fix_transfer_encoding,
)
from sandcrawler.misc import (
datetime_to_cdx,
gen_file_metadata,
parse_cdx_datetime,
url_fuzzy_equal,
)
TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}"
def html_extract_body_teixml(doc: bytes) -> dict:
try:
tei_xml = trafilatura.extract(
doc,
output_format="xmltei",
include_comments=False,
include_formatting=True,
)
except (ValueError, TypeError, Exception) as e:
return dict(
status="trafilatura-parse-error",
error_msg=str(e)[:1000],
)
if tei_xml:
body_txt = teixml_body_text(tei_xml)
word_count = len(body_txt.split())
return dict(
status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count
)
elif doc.startswith(
b''
):
# hack for firstmonday.org
return html_extract_body_teixml(doc[106:])
else:
return dict(status="empty-xml", agent=TRAFILATURA_AGENT)
def teixml_body_text(doc_xml: str) -> str:
ns = {"tei": "http://www.tei-c.org/ns/1.0"}
tree = ET.fromstring(doc_xml)
body = tree.find(".//tei:body", ns)
if body:
return " ".join(body.itertext())
else:
return ""
class WebResource(pydantic.BaseModel):
surt: str
timestamp: datetime.datetime
url: str
sha1hex: str
mimetype: str
status_code: int
size: Optional[int]
sha256hex: Optional[str]
resource_type: Optional[str]
class Config:
json_encoders = {datetime.datetime: lambda dt: dt.isoformat()}
class IngestWebResult(pydantic.BaseModel):
status: str
hit: bool
error_message: Optional[str]
cdx: Optional[dict]
terminal: Optional[Any] # TODO
request: Optional[Any] # TODO
file_meta: Optional[dict]
html_biblio: Optional[BiblioMetadata]
scope: Optional[str]
html_body: Optional[dict]
html_resources: Optional[List[WebResource]]
class Config:
arbitrary_types_allowed = True
json_encoders = {
datetime.datetime: lambda dt: dt.isoformat(),
}
class HtmlMetaRow(pydantic.BaseModel):
sha1hex: str
status: str
scope: Optional[str]
has_teixml: bool
has_thumbnail: bool
word_count: Optional[int]
biblio: Optional[dict]
resources: Optional[List[dict]]
class Config:
arbitrary_types_allowed = True
json_encoders = {
datetime.datetime: lambda dt: dt.isoformat(),
}
def to_sql_tuple(self) -> Tuple:
"""
This is for the html_meta SQL table.
"""
return (
self.sha1hex,
datetime.datetime.now(), # updated
self.status,
self.scope,
self.has_teixml,
self.has_thumbnail,
self.word_count,
(self.biblio or None) and json.dumps(self.biblio, sort_keys=True),
(self.resources or None) and json.dumps(self.resources, sort_keys=True),
)
def quick_fetch_html_resources(
resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]
) -> List[WebResource]:
"""
This is the lazy version that just does a CDX lookup for each resource.
Takes a list instead of single record because we may want to circuit break
on failure, and may introduce concurrency internal to this function.
"""
full = []
closest = when and datetime_to_cdx(when)
for resource in resources:
cdx_row = cdx_client.lookup_best(resource["url"], closest=closest)
if not cdx_row:
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
if cdx_row.url != resource["url"] and not url_fuzzy_equal(cdx_row.url, resource["url"]):
print(
f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr
)
if not cdx_row.status_code:
# TODO: fall back to a full fetch?
print(" WARN: skipping revisit record", file=sys.stderr)
continue
full.append(
WebResource(
surt=cdx_row.surt,
timestamp=cdx_row.datetime,
url=cdx_row.url,
sha1hex=cdx_row.sha1hex,
mimetype=cdx_row.mimetype,
status_code=cdx_row.status_code,
size=None,
sha256hex=None,
resource_type=resource["type"],
)
)
return full
def fetch_html_resources(
resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]
) -> List[WebResource]:
"""
This is the full version which fetches each resource from wayback/petabox
and calculates additional hashes.
Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version
"""
full = []
closest = when and datetime_to_cdx(when)
for resource in resources:
wayback_resp = wayback_client.lookup_resource(resource["url"], closest=closest)
if not wayback_resp or wayback_resp.status != "success":
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True)
if file_meta["sha1hex"] != wayback_resp.cdx.sha1hex:
raise WaybackContentError(
f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}"
)
full.append(
WebResource(
surt=wayback_resp.cdx.surt,
timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
url=wayback_resp.cdx.url,
sha1hex=file_meta["sha1hex"],
mimetype=file_meta["mimetype"],
status_code=wayback_resp.cdx.status_code
or wayback_resp.revisit_cdx.status_code,
size=file_meta["size_bytes"],
sha256hex=file_meta["sha256hex"],
resource_type=resource["type"],
)
)
return full
def html_guess_platform(
url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
) -> Optional[str]:
generator: Optional[str] = None
generator_elem = doc.css_first("meta[name='generator']")
if generator_elem:
generator = generator_elem.attrs["content"]
else:
generator_elem = doc.css_first("a[id='developedBy']")
if generator_elem:
generator = generator_elem.text()
if generator and "open journal systems 3" in generator.lower():
return "ojs3"
elif generator and "open journal systems" in generator.lower():
return "ojs"
elif generator and "plone" in generator.lower():
return "plone"
elif generator and "wordpress" in generator.lower():
return "wordpress"
elif generator and "blogger" in generator.lower():
return "blogger"
elif doc.css_first("body[id='pkp-common-openJournalSystems']"):
return "ojs"
else:
try:
if (
'powered by PKP OJS'
in doc.html
):
return "ojs"
if 'Powered by ' in doc.html:
return "arpha"
if "" in doc.html:
return "galenos"
except UnicodeDecodeError:
pass
icon_elem = doc.css_first("link[type='image/x-icon']")
if icon_elem and "href" in icon_elem.attrs:
if "journalssystem.com" in icon_elem.attrs["href"]:
return "journalssystem.com"
elif "indexcopernicus.com" in icon_elem.attrs["href"]:
return "indexcopernicus"
if "scielo" in url:
return "scielo"
return None
def html_guess_scope(
url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]
) -> str:
"""
This function tries to guess if an HTML document represents one of:
- article-fulltext
- article-abstract
- article-sample
- supplement
- component
- issue-fulltext
- landingpage
- homepage-domain
- blocked-paywall
- blocked-login
- blocked-captcha
- blocked-cookie
- errorpage
- stub
- other
- unknown
Unknown implies the page could be anything. "other" implies it is not
fulltext or a landing page, but could be one of the other categories.
"""
# assert that this is a real URL
assert url.count("/") >= 2
# basic paywall and loginwall detection based on URL
if url.endswith("/cookieAbsent"):
return "blocked-cookie"
if "://page-one.live.cf.public.springer.com" in url:
return "article-sample"
if "scielo" in url:
if "sci_abstract" in url:
return "landingpage"
if "sci_arttext" in url:
return "article-fulltext"
if "showcaptcha.asp" in url:
return "blocked-captcha"
# is this the top-level URL of the domain? aka, no path?
if url.count("/") <= 2 or (url.count("/") == 3) and url.endswith("/"):
return "homepage-domain"
platform = html_guess_platform(url, doc, biblio)
if biblio:
if biblio.html_fulltext_url:
if url_fuzzy_equal(biblio.html_fulltext_url, url):
return "article-fulltext"
else:
return "landingpage"
# platform-specific detection
if platform in ("ojs", "ojs3"):
if biblio and biblio.title:
if word_count and word_count > 1200:
return "fulltext"
else:
return "landingpage"
else:
if "/article/view/" in url and word_count and word_count > 600:
return "fulltext"
return "other"
elif platform == "journalssystem.com":
if biblio and biblio.pdf_fulltext_url and word_count and word_count < 1000:
return "landingpage"
# more platform/publisher specific checks
if "karger.com/Article/Abstract" in url:
return "landingpage"
if "dergipark.gov.tr" in url and not ("download/article-file" in url):
return "other"
try:
if isinstance(doc.html, str) and "403 Forbidden
" in doc.html:
# cloudflare block pattern
return "blocked-forbidden"
except UnicodeDecodeError:
pass
print(f" scope guessing: platform {platform} word count: {word_count}", file=sys.stderr)
# fallback: guess based on word count (arbitrary guesses here)
if word_count is not None:
if word_count < 20:
return "stub"
elif word_count > 500 and platform in ["wordpress", "blogger"]:
return "article-fulltext"
elif word_count > 1200:
return "article-fulltext"
return "unknown"
def run_single(
url: str, timestamp: Optional[str] = None, quick_mode: bool = False
) -> IngestWebResult:
adblock = load_adblock_rules()
wayback_client = WaybackClient()
html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp)
if html_resource.status != "success":
return IngestWebResult(
status=html_resource.status,
hit=False,
cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
)
assert html_resource.terminal_status_code == 200
file_meta = gen_file_metadata(html_resource.body)
file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource)
if file_meta["mimetype"] not in ("text/html", "text/xml"):
return IngestWebResult(
status="wrong-mimetype",
hit=False,
cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
file_meta=file_meta,
)
html_doc = HTMLParser(html_resource.body)
html_biblio = html_extract_biblio(url, html_doc)
html_body = html_extract_body_teixml(html_resource.body)
html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get("word_count"))
if html_scope not in ("article-fulltext", "unknown"):
return IngestWebResult(
status="wrong-scope",
hit=False,
cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
file_meta=file_meta,
html_biblio=html_biblio,
scope=html_scope,
)
raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock)
assert len(raw_resources) <= 200
when = parse_cdx_datetime(html_resource.cdx.datetime)
full_resources: List[WebResource] = []
if quick_mode:
full_resources = quick_fetch_html_resources(
raw_resources, wayback_client.cdx_client, when
)
else:
full_resources = fetch_html_resources(raw_resources, wayback_client, when)
output = IngestWebResult(
status="success",
hit=True,
cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
file_meta=file_meta,
html_body=html_body,
html_biblio=html_biblio,
scope=html_scope,
html_resources=full_resources,
)
return output
def main() -> None:
"""
Run this command like:
python -m sandcrawler.ingest_html
"""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers()
sub = subparsers.add_parser(
"single", help="tries to ingest a single URL, dumps result to stdout"
)
sub.set_defaults(func="run_single")
sub.add_argument(
"url",
help="URL to fetch",
type=str,
)
sub.add_argument(
"--timestamp",
help="timestamp for which to fetch document from wayback",
type=str,
)
sub.add_argument(
"--quick-mode",
help="don't fetch resources, only do CDX lookup",
action="store_true",
)
args = parser.parse_args()
if not args.__dict__.get("func"):
parser.print_help(file=sys.stderr)
sys.exit(-1)
if args.func == "run_single":
result = run_single(args.url, args.timestamp, args.quick_mode)
print(result.json(indent=2, exclude_none=True))
else:
# func = getattr(wp, args.func)
# func()
raise NotImplementedError()
if __name__ == "__main__":
main()