diff options
Diffstat (limited to 'python/sandcrawler')
-rw-r--r-- | python/sandcrawler/db.py | 61 | ||||
-rw-r--r-- | python/sandcrawler/html_ingest.py | 337 | ||||
-rw-r--r-- | python/sandcrawler/html_metadata.py | 452 | ||||
-rw-r--r-- | python/sandcrawler/ia.py | 64 | ||||
-rw-r--r-- | python/sandcrawler/ingest.py | 367 | ||||
-rw-r--r-- | python/sandcrawler/minio.py | 4 | ||||
-rw-r--r-- | python/sandcrawler/misc.py | 47 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 105 | ||||
-rw-r--r-- | python/sandcrawler/xml.py | 7 |
9 files changed, 1290 insertions, 154 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 793f1c4..066e53b 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -1,6 +1,7 @@ import json import datetime +from typing import Optional import psycopg2 import psycopg2.extras @@ -43,6 +44,18 @@ class SandcrawlerPostgrestClient: else: return None + def get_html_meta(self, sha1hex: str) -> Optional[dict]: + resp = requests.get( + self.api_url + "/html_meta", + params=dict(sha1hex=f"eq.{sha1hex}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + def get_file_meta(self, sha1): resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) resp.raise_for_status() @@ -52,12 +65,15 @@ class SandcrawlerPostgrestClient: else: return None - def get_ingest_file_result(self, url): - resp = requests.get(self.api_url + "/ingest_file_result", params=dict(base_url='eq.'+url)) + def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]: + resp = requests.get( + self.api_url + "/ingest_file_result", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None @@ -232,6 +248,41 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) + def insert_html_meta(self, cur, batch, on_conflict="nothing"): + """ + batch elements are expected to have .to_sql_tuple() method + """ + sql = """ + INSERT INTO + html_meta (sha1hex, updated, status, scope, has_teixml, has_thumbnail, word_count, biblio, resources) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status=EXCLUDED.status, + scope=EXCLUDED.scope, + has_teixml=EXCLUDED.has_teixml, + has_thumbnail=EXCLUDED.has_thumbnail, + word_count=EXCLUDED.word_count, + biblio=EXCLUDED.biblio, + resources=EXCLUDED.resources + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [d.to_sql_tuple() for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + def insert_pdftrio(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO diff --git a/python/sandcrawler/html_ingest.py b/python/sandcrawler/html_ingest.py new file mode 100644 index 0000000..f2819c2 --- /dev/null +++ b/python/sandcrawler/html_ingest.py @@ -0,0 +1,337 @@ + +import io +import sys +import json +import datetime +import argparse +import xml.etree.ElementTree as ET +from typing import List, Optional, Any, Tuple + +import trafilatura +import pydantic +from selectolax.parser import HTMLParser + +from sandcrawler.ia import WaybackClient, CdxApiClient, ResourceResult, cdx_to_dict, fix_transfer_encoding +from sandcrawler.misc import gen_file_metadata, parse_cdx_datetime, datetime_to_cdx +from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules + + +TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" + +def html_extract_body_teixml(doc: bytes) -> dict: + tei_xml = trafilatura.extract(doc, + tei_output=True, + include_comments=False, + include_formatting=True, + ) + if tei_xml: + body_txt = teixml_body_text(tei_xml) + word_count = len(body_txt.split()) + return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count) + elif doc.startswith(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'): + # hack for firstmonday.org + return html_extract_body_teixml(doc[106:]) + else: + return dict(status="empty-xml", agent=TRAFILATURA_AGENT) + +def teixml_body_text(doc_xml: str) -> str: + ns = {"tei": "http://www.tei-c.org/ns/1.0"} + tree = ET.fromstring(doc_xml) + body = tree.find('.//tei:body', ns) + if body: + return " ".join(body.itertext()) + else: + return "" + +class WebResource(pydantic.BaseModel): + surt: str + timestamp: datetime.datetime + url: str + sha1hex: str + mimetype: str + status_code: int + size: Optional[int] + sha256hex: Optional[str] + resource_type: Optional[str] + + class Config: + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat() + } + +class IngestWebResult(pydantic.BaseModel): + status: str + hit: bool + error_message: Optional[str] + cdx: Optional[dict] + terminal: Optional[Any] # TODO + request: Optional[Any] # TODO + file_meta: Optional[dict] + html_biblio: Optional[BiblioMetadata] + scope: Optional[str] + html_body: Optional[dict] + html_resources: Optional[List[WebResource]] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat(), + } + +class HtmlMetaRow(pydantic.BaseModel): + sha1hex: str + status: str + scope: Optional[str] + has_teixml: bool + has_thumbnail: bool + word_count: Optional[int] + biblio: Optional[dict] + resources: Optional[List[dict]] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat(), + } + + def to_sql_tuple(self) -> Tuple: + """ + This is for the html_meta SQL table. + """ + return ( + self.sha1hex, + datetime.datetime.now(), # updated + self.status, + self.scope, + self.has_teixml, + self.has_thumbnail, + self.word_count, + (self.biblio or None) and json.dumps(self.biblio, sort_keys=True), + (self.resources or None) and json.dumps(self.resources, sort_keys=True), + ) + + +def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]: + """ + This is the lazy version that just does a CDX lookup for each resource. + + Takes a list instead of single record because we may want to circuit break + on failure, and may introduce concurrency internal to this function. + """ + + full = [] + closest = when and datetime_to_cdx(when) + for resource in resources: + cdx_row = cdx_client.lookup_best(resource['url'], closest=closest) + if not cdx_row: + raise Exception("CDX lookup failed") + if cdx_row.url != resource['url']: + pass + #raise Exception( + # f"CDX lookup URL mismatch: {cdx_row.url} != {resource['url']}") + full.append(WebResource( + surt=cdx_row.surt, + timestamp=cdx_row.datetime, + url=cdx_row.url, + sha1hex=cdx_row.sha1hex, + mimetype=cdx_row.mimetype, + status_code=cdx_row.status_code, + size=None, + sha256hex=None, + resource_type=resource['type'], + )) + + return full + + +def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]: + """ + This is the full version which fetches each resource from wayback/petabox + and calculates additional hashes. + + Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version + """ + + full = [] + closest = when and datetime_to_cdx(when) + for resource in resources: + wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest) + if not wayback_resp or wayback_resp.status != 'success': + # TODO: raise a specific exception so we can catch it elsewhere? + raise Exception("wayback lookup failed") + file_meta = gen_file_metadata(wayback_resp.body) + if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: + raise Exception("wayback payload sha1hex mismatch") + full.append(WebResource( + surt=wayback_resp.cdx.surt, + timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), + url=wayback_resp.cdx.url, + sha1hex=file_meta['sha1hex'], + mimetype=file_meta['mimetype'], + status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, + size=file_meta['size_bytes'], + sha256hex=file_meta['sha256hex'], + resource_type=resource['type'], + )) + + return full + + +def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str: + """ + This function tries to guess if an HTML document represents one of: + + - article-fulltext + - article-abstract + - article-sample + - supplement + - component + - issue-fulltext + - landingpage + - paywall + - loginwall + - blockpage + - errorpage + - stub + - unknown + """ + + # basic paywall and loginwall detection based on URL + if url.endswith("/cookieAbsent"): + return "blockpage" + if "://page-one.live.cf.public.springer.com" in url: + return "article-sample" + + if "scielo" in url: + if "sci_abstract" in url: + return "landingpage" + if "sci_arttext" in url: + return "article-fulltext" + + if biblio and biblio.html_fulltext_url == url: + return "article-fulltext" + + # fallback: guess based word count (arbitrary guesses here) + if word_count == None: + return "unknown" + #print(f" body text word count: {word_count}", file=sys.stderr) + assert word_count is not None + if word_count < 20: + return "stub" + elif word_count > 800: + return "article-fulltext" + + return "unknown" + + +def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult: + + adblock = load_adblock_rules() + wayback_client = WaybackClient() + + html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp) + if html_resource.status != "success": + return IngestWebResult( + status=html_resource.status, + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + ) + + assert html_resource.terminal_status_code == 200 + + file_meta = gen_file_metadata(html_resource.body) + file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource) + + if file_meta['mimetype'] not in ("text/html", "text/xml"): + return IngestWebResult( + status="wrong-mimetype", + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + ) + + html_doc = HTMLParser(html_resource.body) + html_biblio = html_extract_biblio(url, html_doc) + html_body = html_extract_body_teixml(html_resource.body) + html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count')) + if html_scope not in ('article-fulltext', 'unknown'): + return IngestWebResult( + status="wrong-scope", + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + html_biblio=html_biblio, + scope=html_scope, + ) + + raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock) + assert len(raw_resources) <= 200 + + when = parse_cdx_datetime(html_resource.cdx.datetime) + + full_resources: List[WebResource] = [] + if quick_mode: + full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, wayback_client, when) + + output = IngestWebResult( + status="success", + hit=True, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + html_body=html_body, + html_biblio=html_biblio, + scope=html_scope, + html_resources=full_resources, + ) + return output + + +def main() -> None: + """ + Run this command like: + + python -m sandcrawler.html_ingest + """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + subparsers = parser.add_subparsers() + + sub = subparsers.add_parser( + "single", help="tries to ingest a single URL, dumps result to stdout" + ) + sub.set_defaults(func="run_single") + sub.add_argument( + "url", + help="URL to fetch", + type=str, + ) + sub.add_argument( + "--timestamp", + help="timestamp for which to fetch document from wayback", + type=str, + ) + sub.add_argument( + "--quick-mode", + help="don't fetch resources, only do CDX lookup", + action="store_true", + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + parser.print_help(file=sys.stderr) + sys.exit(-1) + + if args.func == "run_single": + result = run_single(args.url, args.timestamp, args.quick_mode) + print(result.json(indent=2, exclude_none=True)) + else: + #func = getattr(wp, args.func) + #func() + raise NotImplementedError() + +if __name__ == "__main__": + main() diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py new file mode 100644 index 0000000..0d14166 --- /dev/null +++ b/python/sandcrawler/html_metadata.py @@ -0,0 +1,452 @@ + +import datetime +from typing import List, Optional, Any, Tuple, Dict +import urllib.parse + +import dateparser +from selectolax.parser import HTMLParser +import pydantic +import braveblock + + +# this is a map of metadata keys to CSS selectors +# sources for this list include: +# - google scholar crawling notes (https://scholar.google.com/intl/ja/scholar/inclusion.html#indexing) +# - inspection of actual publisher HTML +# - http://div.div1.com.au/div-thoughts/div-commentaries/66-div-commentary-metadata +# - "HTML meta tags used by journal articles" +# https://gist.github.com/hubgit/5985963 +# order of these are mostly by preference/quality (best option first), though +# also/sometimes re-ordered for lookup efficiency (lookup stops after first +# match) +HEAD_META_PATTERNS: Any = { + "title": [ + "meta[name='citation_title']", + "meta[name='eprints.title']", + "meta[name='prism.title']", + "meta[name='bepress_citation_title']", + "meta[name='og:title']", + "meta[name='dcterms.title']", + "meta[name='dc.title']", + ], + "subtitle": [ + "meta[name='prism.subtitle']", + ], + "doi": [ + "meta[name='citation_doi']", + "meta[name='DOI']", + "meta[id='DOI']", + "meta[name='prism.doi']", + "meta[name='bepress_citation_doi']", + "meta[name='dc.identifier.doi']", + "meta[name='dc.identifier'][scheme='doi']", + ], + "pmid": [ + "meta[name='citation_pmid']", + ], + "abstract": [ + "meta[name='citation_abstract']", + "meta[name='bepress_citation_abstract']", + "meta[name='eprints.abstract']", + "meta[name='dcterms.abstract']", + "meta[name='prism.teaser']", + "meta[name='dc.description']", + "meta[name='og:description']", + ], + "container_name": [ + "meta[name='citation_journal_title']", + "meta[name='bepress_citation_journal_title']", + "meta[name='citation_conference_title']", + "meta[name='bepress_citation_conference_title']", + "meta[name='prism.publicationName']", + "meta[name='eprints.publication']", + "meta[name='dc.relation.ispartof']", + "meta[name='dc.source']", + "meta[property='og:site_name']", + ], + "container_abbrev": [ + "meta[name='citation_journal_abbrev']", + ], + "raw_date": [ + "meta[name='citation_publication_date']", + "meta[name='bepress_citation_publication_date']", + "meta[name='prism.publicationDate']", + "meta[name='citation_date']", + "meta[name='bepress_citation_date']", + "meta[name='citation_online_date']", + "meta[name='bepress_citation_online_date']", + "meta[itemprop='datePublished']", + "meta[name='article:published']", + "meta[name='eprints.datestamp']", + "meta[name='eprints.date']", + "meta[name='dc.date.created']", + "meta[name='dc.issued']", + "meta[name='dcterms.date']", + "meta[name='dc.date']", + ], + "release_year": [ + "meta[itemprop='citation_year']", + "meta[itemprop='prism:copyrightYear']", + ], + "first_page": [ + "meta[name='citation_firstpage']", + "meta[name='bepress_citation_firstpage']", + "meta[name='prism.startingPage']", + "meta[name='dc.citation.spage']", + ], + "last_page": [ + "meta[name='citation_lastpage']", + "meta[name='bepress_citation_lastpage']", + "meta[name='prism.endingPage']", + "meta[name='dc.citation.epage']", + ], + "issue": [ + "meta[name='citation_issue']", + "meta[name='bepress_citation_issue']", + "meta[name='prism.issueIdentifier']", + "meta[name='dc.citation.issue']", + ], + "volume": [ + "meta[name='citation_volume']", + "meta[name='bepress_citation_volume']", + "meta[name='prism.volume']", + "meta[name='dc.citation.volume']", + ], + "number": [ + "meta[name='citation_technical_report_number']", + "meta[name='bepress_citation_technical_report_number']", + "meta[name='citation_number']", + "meta[name='bepress_citation_number']", + "meta[name='prism.number']", + ], + "container_issn": [ + "meta[name='citation_issn']", + "meta[name='bepress_citation_issn']", + "meta[name='prism.issn']", + "meta[name='prism.eIssn']", + "meta[name='eprints.issn']", + "meta[name='dc.source.issn']", + ], + "isbn": [ + "meta[name='citation_isbn']", + "meta[name='bepress_citation_isbn']", + "meta[name='prism.isbn']", + ], + "publisher": [ + "meta[name='citation_publisher']", + "meta[name='bepress_citation_publisher']", + "meta[name='eprints.publisher']", + "meta[name='citation_technical_report_institution']", + "meta[name='dcterms.publisher']", + "meta[name='dc.publisher']", + ], + "raw_release_type": [ + "meta[name='citation_article_type']", + "meta[name='bepress_citation_article_type']", + "meta[name='prism.contentType']", + "meta[name='eprints.type']", + "meta[name='dc.type']", + ], + "lang": [ + "meta[name='citation_language']", + "meta[name='bepress_citation_language']", + "meta[name='dcterms.language']", + "meta[name='dc.language']", + "meta[name='og:locale']", + ], +} + +HEAD_META_LIST_PATTERNS: Any = { + "contrib_names": [ + "meta[name='citation_author']", + "meta[name='bepress_citation_author']", + "meta[name='eprints.creators_name']", + "meta[name='dcterms.creator']", + "meta[name='article:author']", + "meta[name='dc.creator']", + "meta[name='dc.contributor']", + ], + # TODO: citation_author_institution + "raw_references": [ + "meta[name='citation_reference']", + ], + "raw_identifiers": [ + "meta[name='eprints.id_number']", + "meta[name='dcterms.identifier']", + "meta[name='dc.identifier']", + ], +} + +XML_FULLTEXT_PATTERNS: List[dict] = [ + { + "selector": "meta[name='citation_xml_url']", + "attr": "content", + "technique": "citation_xml_url", + }, + { + "selector": "link[rel='alternate'][type='application/xml']", + "attr": "href", + "technique": "alternate link", + }, + { + "in_doc_url": "scielo", + "in_fulltext_url": "articleXML", + "selector": "a[target='xml']", + "attr": "href", + "technique": "SciElo XML link", + }, +] + +HTML_FULLTEXT_PATTERNS: List[dict] = [ + { + "selector": "meta[name='citation_fulltext_html_url']", + "attr": "content", + "technique": "citation_fulltext_html_url", + }, +] + +PDF_FULLTEXT_PATTERNS: List[dict] = [ + { + "selector": "meta[name='citation_pdf_url']", + "attr": "content", + "technique": "citation_pdf_url", + }, + { + "selector": "meta[name='bepress_citation_pdf_url']", + "attr": "content", + "technique": "citation_pdf_url", + }, +] + +RELEASE_TYPE_MAP = { + "research article": "article-journal", + "text.serial.journal": "article-journal", +} + + +class BiblioMetadata(pydantic.BaseModel): + title: Optional[str] + subtitle: Optional[str] + contrib_names: Optional[List[str]] + release_date: Optional[datetime.date] + release_year: Optional[int] + release_type: Optional[str] + release_stage: Optional[str] + withdrawn_status: Optional[str] + lang: Optional[str] + country_code: Optional[str] + volume: Optional[str] + issue: Optional[str] + number: Optional[str] + pages: Optional[str] + first_page: Optional[str] + last_page: Optional[str] + license: Optional[str] + publisher: Optional[str] + container_name: Optional[str] + container_abbrev: Optional[str] + container_issn: Optional[str] + container_type: Optional[str] + raw_references: Optional[List[str]] + + doi: Optional[str] + pmid: Optional[str] + pmcid: Optional[str] + isbn13: Optional[str] + publisher_ident: Optional[str] + oai_id: Optional[str] + + abstract: Optional[str] + pdf_fulltext_url: Optional[str] + html_fulltext_url: Optional[str] + xml_fulltext_url: Optional[str] + + class Config: + json_encoders = { + datetime.date: lambda dt: dt.isoformat() + } + + +def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]: + """ + Tries to quickly extract fulltext URLs using a set of patterns. This + function is intendend to be generic across various extraction techniques. + + Returns null or a tuple of (url, technique) + """ + for pattern in patterns: + if not 'selector' in pattern: + continue + if 'in_doc_url' in pattern: + if not pattern['in_doc_url'] in doc_url: + continue + elem = doc.css_first(pattern['selector']) + if not elem: + continue + if 'attr' in pattern: + val = elem.attrs[pattern['attr']] + if val: + val = urllib.parse.urljoin(doc_url, val) + assert val + if 'in_fulltext_url' in pattern: + if not pattern['in_fulltext_url'] in val: + continue + return (val, pattern.get('technique', 'unknown')) + return None + +def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]: + + meta: Any = dict() + head = doc.css_first("head") + if not head: + return None + + for field, patterns in HEAD_META_PATTERNS.items(): + for pattern in patterns: + val = head.css_first(pattern) + #print((field, pattern, val)) + if val and val.attrs['content']: + meta[field] = val.attrs['content'] + break + + for field, patterns in HEAD_META_LIST_PATTERNS.items(): + for pattern in patterns: + val_list = head.css(pattern) + if val_list: + for val in val_list: + if val.attrs['content']: + if not field in meta: + meta[field] = [] + meta[field].append(val.attrs['content']) + break + + # (some) fulltext extractions + pdf_fulltext_url = html_extract_fulltext_url(doc_url, doc, PDF_FULLTEXT_PATTERNS) + if pdf_fulltext_url: + meta['pdf_fulltext_url'] = pdf_fulltext_url[0] + xml_fulltext_url = html_extract_fulltext_url(doc_url, doc, XML_FULLTEXT_PATTERNS) + if xml_fulltext_url: + meta['xml_fulltext_url'] = xml_fulltext_url[0] + html_fulltext_url = html_extract_fulltext_url(doc_url, doc, HTML_FULLTEXT_PATTERNS) + if html_fulltext_url: + meta['html_fulltext_url'] = html_fulltext_url[0] + + # TODO: replace with clean_doi() et al + if meta.get('doi') and meta.get('doi').startswith('doi:'): + meta['doi'] = meta['doi'][4:] + + raw_identifiers = meta.pop('raw_identifiers', []) + for ident in raw_identifiers: + if ident.startswith('doi:10.'): + if not 'doi' in meta: + meta['doi'] = ident.replace('doi:', '') + elif ident.startswith('10.') and '/' in ident: + if not 'doi' in meta: + meta['doi'] = ident + elif ident.startswith('isbn:'): + if not 'isbn' in meta: + meta['isbn'] = ident.replace('isbn:', '') + + raw_date = meta.pop('raw_date', None) + if raw_date: + parsed = dateparser.parse(raw_date) + if parsed: + meta['release_date'] = parsed.date() + + raw_release_type = meta.pop('raw_release_type', None) + if raw_release_type: + release_type = RELEASE_TYPE_MAP.get(raw_release_type.lower().strip()) + if release_type: + meta['release_type'] = release_type + + return BiblioMetadata(**meta) + +def load_adblock_rules() -> braveblock.Adblocker: + """ + TODO: consider blocking very generic assets: + - ://fonts.googleapis.com/css* + - ://journals.plos.org/plosone/resource/img/icon.* + """ + return braveblock.Adblocker( + include_easylist=True, + include_easyprivacy=True, + rules=[ + "/favicon.ico^", + "||fonts.googleapis.com^", + "||widgets.figshare.com^", + "||crossmark-cdn.crossref.org^", + "||platform.twitter.com^", + "||verify.nature.com^", + "||s7.addthis.com^", + "||www.mendeley.com^", + "||pbs.twimg.com^", + "||badge.dimensions.ai^", + + # not sure about these CC badges (usually via a redirect) + #"||licensebuttons.net^", + #"||i.creativecommons.org^", + + # Should we skip jquery, or other generic javascript CDNs? + #"||code.jquery.com^", + #"||ajax.googleapis.com^", + #"||cdnjs.cloudflare.com^", + + # badges, "share" buttons, etc + "apis.google.com/js/plusone", + + # PLOS images + "/resource/img/icon.*.16.png^", + ], + ) + + +def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name: str) -> list: + resources = [] + + for node in doc.css(selector): + for attr in attrs: + url = node.attrs.get(attr) + if url: + resources.append(dict(url=url, type=type_name)) + + return resources + + +def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list: + """ + This function tries to find all the important resources in a page. The + presumption is that the HTML document is article fulltext, and we want the + list of all resoures (by URL) necessary to replay the page. + + The returned resource URLs each have a type (script, img, css, etc), and + should be fully-qualified URLs (not relative). + + Adblock filtering is run to remove unwanted resources. + """ + resources = [] + + # select various resource references + resources += _extract_generic(doc, "script", ["src"], "script") + resources += _extract_generic(doc, "link[rel='stylesheet']", ["href"], "stylesheet") + # TODO: srcset and parse + # eg: https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-2x.jpg 1200w, https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-1x.jpg 600w, https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-small.jpg 355w + resources += _extract_generic(doc, "img", ["src"], "image") + resources += _extract_generic(doc, "audio", ["src"], "audio") + resources += _extract_generic(doc, "video", ["src"], "media") + resources += _extract_generic(doc, "source", ["src"], "media") + resources += _extract_generic(doc, "track", ["src"], "media") + resources += _extract_generic(doc, "iframe", ["src"], "subdocument") + resources += _extract_generic(doc, "embed", ["src"], "media") + + # ensure URLs are absolute + for r in resources: + r['url'] = urllib.parse.urljoin(doc_url, r['url']) + + # filter using adblocker + resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False] + + # remove duplicates + resources = [dict(t) for t in {tuple(d.items()) for d in resources}] + + return resources + diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index e6c6295..0b58f3b 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -3,10 +3,14 @@ # in `wayback` library. Means we can't run pylint. # pylint: skip-file -import os, sys, time +import os +import sys +import time +import gzip import json import requests import datetime +from typing import Tuple from collections import namedtuple import http.client @@ -17,7 +21,7 @@ http.client._MAXHEADERS = 1000 # type: ignore import wayback.exception from http.client import IncompleteRead from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory +from gwb.loader import CDXLoaderFactory3 from .misc import b32_hex, requests_retry_session, gen_file_metadata, clean_url @@ -232,7 +236,7 @@ class CdxApiClient: assert row.status_code == filter_status_code return row - def lookup_best(self, url, max_age_days=None, best_mimetype=None): + def lookup_best(self, url, max_age_days=None, best_mimetype=None, closest=None): """ Fetches multiple CDX rows for the given URL, tries to find the most recent. @@ -270,6 +274,10 @@ class CdxApiClient: if max_age_days: since = datetime.date.today() - datetime.timedelta(days=max_age_days) params['from'] = '%04d%02d%02d' % (since.year, since.month, since.day), + if closest: + params['closest'] = closest + params['sort'] = "closest" + #print(params, file=sys.stderr) rows = self._query_api(params) if not rows: return None @@ -352,14 +360,14 @@ class WaybackClient: raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3( webdata_secret=self.petabox_webdata_secret, - download_base_url=self.petabox_base_url)) + )) try: #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) gwb_record = self.rstore.load_resource(warc_uri, offset, csize) except wayback.exception.ResourceUnavailable: - print("Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") except ValueError as ve: raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) @@ -398,8 +406,11 @@ class WaybackClient: # convert revisit_dt # len("2018-07-24T11:56:49"), or with "Z" assert len(revisit_dt) in (19, 20) - revisit_uri = revisit_uri.decode('utf-8') - revisit_dt = revisit_dt.decode('utf-8').replace('-', '').replace(':', '').replace('T', '').replace('Z', '') + if type(revisit_uri) is bytes: + revisit_uri = revisit_uri.decode('utf-8') + if type(revisit_dt) is bytes: + revisit_dt = revisit_dt.decode('utf-8') + revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '') assert len(revisit_dt) == 14 try: revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt) @@ -507,7 +518,7 @@ class WaybackClient: # TODO: don't need *all* these hashes, just sha1 file_meta = gen_file_metadata(resp.content) if cdx_sha1hex != file_meta['sha1hex']: - print("REPLAY MISMATCH: cdx:{} replay:{}".format( + print(" REPLAY MISMATCH: cdx:{} replay:{}".format( cdx_sha1hex, file_meta['sha1hex']), file=sys.stderr) @@ -568,7 +579,7 @@ class WaybackClient: else: return None - def lookup_resource(self, start_url, best_mimetype=None): + def lookup_resource(self, start_url, best_mimetype=None, closest=None): """ Looks in wayback for a resource starting at the URL, following any redirects. Returns a ResourceResult object, which may indicate a @@ -596,7 +607,7 @@ class WaybackClient: urls_seen = [start_url] for i in range(self.max_redirects): print(" URL: {}".format(next_url), file=sys.stderr) - cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype) + cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest) #print(cdx_row, file=sys.stderr) if not cdx_row: return ResourceResult( @@ -668,7 +679,7 @@ class WaybackClient: ) assert 300 <= resource.status_code < 400 if not resource.location: - print("bad redirect record: {}".format(cdx_row), file=sys.stderr) + print(" bad redirect record: {}".format(cdx_row), file=sys.stderr) return ResourceResult( start_url=start_url, hit=False, @@ -697,7 +708,7 @@ class WaybackClient: next_url = clean_url(next_url) cdx_row = cdx_partial_from_row(cdx_row) if not next_url: - print("bad redirect record: {}".format(cdx_row), file=sys.stderr) + print(" bad redirect record: {}".format(cdx_row), file=sys.stderr) return ResourceResult( start_url=start_url, hit=False, @@ -980,10 +991,10 @@ class SavePageNowClient: best_mimetype="application/pdf", ) if elsevier_pdf_cdx and elsevier_pdf_cdx.mimetype == "application/pdf": - print("Trying pdf.sciencedirectassets.com hack!", file=sys.stderr) + print(" Trying pdf.sciencedirectassets.com hack!", file=sys.stderr) cdx_row = elsevier_pdf_cdx else: - print("Failed pdf.sciencedirectassets.com hack!", file=sys.stderr) + print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr) #print(elsevier_pdf_cdx, file=sys.stderr) if not cdx_row: @@ -999,7 +1010,7 @@ class SavePageNowClient: retry_sleep=9.0, ) except KeyError as ke: - print("CDX KeyError: {}".format(ke), file=sys.stderr) + print(" CDX KeyError: {}".format(ke), file=sys.stderr) return ResourceResult( start_url=start_url, hit=False, @@ -1060,3 +1071,24 @@ class SavePageNowClient: revisit_cdx=revisit_cdx, ) + +def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]: + if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': + print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) + inner_body = gzip.decompress(resource.body) + inner_resource = ResourceResult( + body=inner_body, + # copy all other fields + start_url=resource.start_url, + hit=resource.hit, + status=resource.status, + terminal_url=resource.terminal_url, + terminal_dt=resource.terminal_dt, + terminal_status_code=resource.terminal_status_code, + cdx=resource.cdx, + revisit_cdx=resource.revisit_cdx, + ) + inner_file_meta = gen_file_metadata(inner_resource.body) + return (inner_file_meta, inner_resource) + else: + return (file_meta, resource) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 6d8b162..0c8eee6 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -5,16 +5,25 @@ import gzip import time import base64 import requests +from typing import Optional, Tuple, Any, Dict, List from http.server import BaseHTTPRequestHandler, HTTPServer from collections import namedtuple +from selectolax.parser import HTMLParser -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding from sandcrawler.grobid import GrobidClient from sandcrawler.pdfextract import process_pdf, PdfExtractResult -from sandcrawler.misc import gen_file_metadata, clean_url +from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime from sandcrawler.html import extract_fulltext_url +from sandcrawler.html_ingest import fetch_html_resources, \ + quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ + WebResource +from sandcrawler.html_metadata import html_extract_fulltext_url, \ + XML_FULLTEXT_PATTERNS, HTML_FULLTEXT_PATTERNS, BiblioMetadata, \ + html_extract_resources, html_extract_biblio, load_adblock_rules from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient +from sandcrawler.xml import xml_reserialize class IngestFileWorker(SandcrawlerWorker): @@ -46,7 +55,7 @@ class IngestFileWorker(SandcrawlerWorker): def __init__(self, sink=None, **kwargs): super().__init__() - + self.sink = sink self.wayback_client = kwargs.get('wayback_client') if not self.wayback_client: @@ -63,12 +72,18 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_sink = kwargs.get('grobid_sink') self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') + self.htmlteixml_sink = kwargs.get('htmlteixml_sink') + self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) self.try_existing_grobid = kwargs.get('try_existing_grobid', True) self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) self.try_wayback = kwargs.get('try_wayback', True) self.try_spn2 = kwargs.get('try_spn2', True) + self.html_quick_mode = False + self.adblock_rules = load_adblock_rules() + self.max_html_resources = 200 self.base_url_blocklist = [ # robot blocking @@ -76,8 +91,10 @@ class IngestFileWorker(SandcrawlerWorker): # temporary, until we implement specific fetch and 'petabox' output "://archive.org/", + "://www.archive.org/", "://web.archive.org/web/", "://openlibrary.org/", + "://www.openlibrary.org/", "://fatcat.wiki/", # Domain squats @@ -135,7 +152,7 @@ class IngestFileWorker(SandcrawlerWorker): ] - def check_existing_ingest(self, base_url): + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ Check in sandcrawler-db (postgres) to see if we have already ingested this URL (ingest file result table). @@ -147,14 +164,14 @@ class IngestFileWorker(SandcrawlerWorker): """ if not self.try_existing_ingest: return None - existing = self.pgrest_client.get_ingest_file_result(base_url) + existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) # TODO: filter on more flags? if existing and existing['hit'] == True: return existing else: return None - def find_resource(self, url, best_mimetype=None, force_recrawl=False): + def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: """ Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. @@ -183,7 +200,7 @@ class IngestFileWorker(SandcrawlerWorker): if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': old_failure = True - if self.try_spn2 and (resource == None or (resource.status == 'no-capture') or soft404 or old_failure): + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): via = "spn2" force_simple_get = 0 for domain in self.spn2_simple_get_domains: @@ -191,14 +208,14 @@ class IngestFileWorker(SandcrawlerWorker): force_simple_get = 1 break resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get) - print("[FETCH {}\t] {}\t{}".format( + print("[FETCH {:>6}] {} {}".format( via, - resource.status, - resource.terminal_url or url), + (resource and resource.status), + (resource and resource.terminal_url) or url), file=sys.stderr) return resource - def process_existing(self, request, result_row): + def process_existing(self, request: dict, result_row: dict) -> dict: """ If we have an existing ingest file result, do any database fetches or additional processing necessary to return a result. @@ -226,16 +243,25 @@ class IngestFileWorker(SandcrawlerWorker): } return result - def process_hit(self, resource, file_meta): + def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: """ Run all the necessary processing for a new/fresh ingest hit. """ - return { - 'grobid': self.process_grobid(resource, file_meta), - 'pdf_meta': self.process_pdfextract(resource, file_meta), - } + if ingest_type == "pdf": + return { + 'grobid': self.process_grobid(resource, file_meta), + 'pdf_meta': self.process_pdfextract(resource, file_meta), + } + elif ingest_type == "xml": + return { + 'xml_meta': self.process_xml(resource, file_meta), + } + elif ingest_type == "html": + return self.process_html(resource, file_meta) + else: + raise NotImplementedError(f"process {ingest_type} hit") - def process_grobid(self, resource, file_meta): + def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: """ Submits to resource body to GROBID for processing. @@ -266,7 +292,7 @@ class IngestFileWorker(SandcrawlerWorker): result.pop('key', None) return result - def process_pdfextract(self, resource, file_meta): + def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: """ Extracts thumbnail and pdf_meta info from PDF. @@ -288,13 +314,99 @@ class IngestFileWorker(SandcrawlerWorker): if self.thumbnail_sink and result.page0_thumbnail is not None: self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) if self.pdftext_sink: - self.pdftext_sink.push_record(result.to_pdftext_dict()) + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) result.page0_thumbnail = None result.text = None result.file_meta = None return result.to_pdftext_dict() - def timeout_response(self, task): + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + jats_xml = xml_reserialize(resource.body) + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=jats_xml, + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + + def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: + + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + assert html_biblio + html_body = html_extract_body_teixml(resource.body) + html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) + html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) + + if html_scope not in ('article-fulltext', 'unknown'): + html_body.pop("tei_xml", None) + return dict( + status="html-body-wrong-scope", + html_biblio=html_biblio_dict, + html_scope=html_scope, + ) + + raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) + if len(raw_resources) > self.max_html_resources: + html_body.pop("tei_xml", None) + return dict( + status="too-many-resources", + html_biblio=html_biblio_dict, + html_scope=html_scope, + ) + + when = parse_cdx_datetime(resource.cdx.datetime) + + full_resources: List[WebResource] = [] + + partial_result = dict( + html_biblio=html_biblio_dict, + html_scope=html_scope, + ) + + try: + if self.html_quick_mode: + full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) + except PetaboxError as e: + partial_result['status'] = 'petabox-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except CdxApiError as e: + partial_result['status'] = 'cdx-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackError as e: + partial_result['status'] = 'wayback-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackContentError as e: + partial_result['status'] = 'wayback-content-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + + if self.htmlteixml_sink and html_body['status'] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) + + html_body.pop("tei_xml", None) + + return dict( + html_body=html_body, + html_biblio=html_biblio_dict, + scope=html_scope, + html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], + ) + + def timeout_response(self, task: dict) -> dict: print("[TIMEOUT]", file=sys.stderr) return dict( request=task, @@ -303,22 +415,20 @@ class IngestFileWorker(SandcrawlerWorker): error_message="ingest worker internal timeout", ) - def want(self, request): - if not request.get('ingest_type') in ('file', 'pdf'): + def want(self, request: dict) -> bool: + if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html'): return False return True - def process(self, request, key=None): + def process(self, request: dict, key: Any = None) -> dict: - # backwards compatibility - if request.get('ingest_type') in ('file', None): + # old backwards compatibility + if request.get('ingest_type') == 'file': request['ingest_type'] = 'pdf' - # for now, only pdf ingest is implemented - if not 'ingest_type' in request: - request['ingest_type'] = "pdf" - assert request.get('ingest_type') == "pdf" ingest_type = request.get('ingest_type') + if ingest_type not in ("pdf", "xml", "html"): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets @@ -329,25 +439,27 @@ class IngestFileWorker(SandcrawlerWorker): for block in self.base_url_blocklist: if block in base_url: - print("[SKIP {}\t] {}".format(ingest_type, base_url), file=sys.stderr) + print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) return dict(request=request, hit=False, status="skip-url-blocklist") - print("[INGEST {}\t] {}".format(ingest_type, base_url), file=sys.stderr) + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) best_mimetype = None if ingest_type == "pdf": best_mimetype = "application/pdf" + elif ingest_type == "xml": + best_mimetype = "text/xml" + elif ingest_type == "html": + best_mimetype = "text/html" - existing = self.check_existing_ingest(base_url) + existing = self.check_existing_ingest(ingest_type, base_url) if existing: return self.process_existing(request, existing) - result = dict(request=request, hit=False) + result: Dict[str, Any] = dict(request=request, hit=False) next_url = base_url hops = [base_url] - self.max_hops = 6 - while len(hops) <= self.max_hops: @@ -400,25 +512,9 @@ class IngestFileWorker(SandcrawlerWorker): result['error_message'] = str(e)[:1600] return result - if not resource.hit: - result['status'] = resource.status - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) - return result - - if not resource.body: - result['status'] = 'null-body' - return result - file_meta = gen_file_metadata(resource.body) + assert resource - if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url): - result['status'] = 'blocked-cookie' + if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, @@ -426,53 +522,47 @@ class IngestFileWorker(SandcrawlerWorker): } if resource.terminal_url not in result['hops']: result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status return result - # crude handling of content-encoding; wayback fetch library usually - # (and should always?) handle this - if file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print("transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) - try: - inner_body = gzip.decompress(resource.body) - except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) - return result - if not inner_body: - result['status'] = 'null-body' - return result - resource = ResourceResult( - body=inner_body, - # copy all other fields - start_url=resource.start_url, - hit=resource.hit, - status=resource.status, - terminal_url=resource.terminal_url, - terminal_dt=resource.terminal_dt, - terminal_status_code=resource.terminal_status_code, - cdx=resource.cdx, - revisit_cdx=resource.revisit_cdx, - ) - file_meta = gen_file_metadata(resource.body) - - if "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] or "application/xml" in file_meta['mimetype'] or "text/xml" in file_meta['mimetype']: + if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url): + result['status'] = 'blocked-cookie' + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) + return result + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + if ingest_type == "pdf" and html_ish_resource: # Got landing page or similar. Some XHTML detected as "application/xml" - if resource.terminal_dt: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) - - result['html'] = fulltext_url + result['extract_next_hop'] = fulltext_url + if not fulltext_url: result['status'] = 'no-pdf-link' return result next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') assert next_url next_url = clean_url(next_url) - print("[PARSE\t] {}\t{}".format( + print("[PARSE {:>6}] {} {}".format( + ingest_type, fulltext_url.get('technique'), next_url, ), @@ -483,7 +573,44 @@ class IngestFileWorker(SandcrawlerWorker): return result hops.append(next_url) continue - + elif ingest_type == "xml" and html_ish_resource: + # parse with selectolax, extract XML fulltext URL + html_doc = HTMLParser(resource.body) + extract_next_hop = html_extract_fulltext_url(resource.terminal_url, html_doc, XML_FULLTEXT_PATTERNS) + if extract_next_hop: + next_url = extract_next_hop[0] + technique = extract_next_hop[1] + print("[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr) + if next_url in hops: + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + elif ingest_type == "html" and html_ish_resource: + # parse with selectolax, extract XML fulltext URL + html_doc = HTMLParser(resource.body) + extract_next_hop = html_extract_fulltext_url(resource.terminal_url, html_doc, HTML_FULLTEXT_PATTERNS) + if extract_next_hop: + next_url = extract_next_hop[0] + technique = extract_next_hop[1] + if next_url in hops: + # for HTML ingest, we don't count this as a link-loop + break + print("[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr) + hops.append(next_url) + continue + # default is to NOT keep hopping break @@ -491,6 +618,11 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "max-hops-exceeded" return result + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, @@ -499,35 +631,50 @@ class IngestFileWorker(SandcrawlerWorker): "terminal_sha1hex": file_meta['sha1hex'], } - # fetch must be a hit if we got this far (though not necessarily an ingest hit!) - assert resource.hit == True - assert resource.terminal_status_code in (200, 226) - result['file_meta'] = file_meta result['cdx'] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) - # other failure cases - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' - return result - - if not (resource.hit and file_meta['mimetype'] == "application/pdf"): - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" - return result + if ingest_type == "pdf": + if file_meta['mimetype'] != "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta['mimetype'] not in ("text/html",): + result['status'] = "wrong-mimetype" + return result + else: + raise NotImplementedError() - info = self.process_hit(resource, file_meta) + info = self.process_hit(ingest_type, resource, file_meta) result.update(info) + # check if processing turned up an error + if info.get('status') not in ('success', None): + result['status'] = info['status'] + return result + result['status'] = "success" result['hit'] = True - print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format( - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) + if ingest_type == "pdf": + print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) + else: + print("[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) return result diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index 8b02211..c7deea1 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -17,8 +17,8 @@ class SandcrawlerMinioClient(object): Example config: host="localhost:9000", - access_key=os.environ['MINIO_ACCESS_KEY'], - secret_key=os.environ['MINIO_SECRET_KEY'], + access_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'], + secret_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'], """ self.mc = minio.Minio( host_url, diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py index 1b8aa92..67e5c0b 100644 --- a/python/sandcrawler/misc.py +++ b/python/sandcrawler/misc.py @@ -3,20 +3,22 @@ import base64 import magic import hashlib import datetime +from typing import Optional + import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error import urlcanon -def clean_url(s): +def clean_url(s: str) -> str: s = s.strip() parsed = urlcanon.parse_url(s) if not parsed.port and parsed.colon_before_port: parsed.colon_before_port = b'' return str(urlcanon.whatwg(parsed)) -def gen_file_metadata(blob): +def gen_file_metadata(blob: bytes) -> dict: """ Takes a file blob (bytestream) and returns hashes and other metadata. @@ -24,6 +26,10 @@ def gen_file_metadata(blob): """ assert blob mimetype = magic.Magic(mime=True).from_buffer(blob) + if mimetype in ("application/xml", "text/xml"): + # crude check for JATS XML, using only first 1 kB of file + if b"<article " in blob[:1024] and not b"<html" in blob[:1024]: + mimetype = "application/jats+xml" hashes = [ hashlib.sha1(), hashlib.sha256(), @@ -39,7 +45,7 @@ def gen_file_metadata(blob): mimetype=mimetype, ) -def b32_hex(s): +def b32_hex(s: str) -> str: """ Converts a base32-encoded SHA-1 checksum into hex-encoded @@ -62,7 +68,7 @@ NORMAL_MIME = ( 'application/octet-stream', ) -def normalize_mime(raw): +def normalize_mime(raw: str) -> Optional[str]: raw = raw.lower().strip() for norm in NORMAL_MIME: if raw.startswith(norm): @@ -103,7 +109,7 @@ def test_normalize_mime(): assert normalize_mime("binary/octet-stream") == "application/octet-stream" -def parse_cdx_line(raw_cdx, normalize=True): +def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]: """ This method always filters a few things out: @@ -138,32 +144,45 @@ def parse_cdx_line(raw_cdx, normalize=True): mime = normalize_mime(mime) sha1hex = b32_hex(sha1b32) - http_status = int(http_status) - c_size = int(c_size) - offset = int(offset) return dict( surt=surt, url=url, datetime=dt, mimetype=mime, - http_status=http_status, + http_status=int(http_status), sha1b32=sha1b32, sha1hex=sha1hex, - warc_csize=c_size, - warc_offset=offset, + warc_csize=int(c_size), + warc_offset=int(offset), warc_path=warc, ) -def parse_cdx_datetime(dt_str): +def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]: + if not dt_str: + return None try: - return datetime.strptime(dt_str, "%Y%m%d%H%M%S") + return datetime.datetime.strptime(dt_str, "%Y%m%d%H%M%S") except Exception: return None +def test_parse_cdx_datetime() -> None: + assert parse_cdx_datetime("") == None + assert parse_cdx_datetime("asdf") == None + assert parse_cdx_datetime("19930203123045") != None + assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3) + +def datetime_to_cdx(dt: datetime.datetime) -> str: + return '%04d%02d%02d%02d%02d%02d' % ( + dt.year, dt.month, dt.day, + dt.hour, dt.minute, dt.second, + ) + +def test_datetime_to_cdx() -> None: + assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) def requests_retry_session(retries=10, backoff_factor=3, - status_forcelist=(500, 502, 504), session=None): + status_forcelist=(500, 502, 504), session=None) -> requests.Session: """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index fbc5273..f13b1f3 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -20,7 +20,7 @@ grobid """ import os -from typing import Optional +from typing import Optional, AnyStr import xml.etree.ElementTree from sandcrawler.workers import SandcrawlerWorker @@ -28,6 +28,7 @@ from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient from sandcrawler.grobid import GrobidClient from sandcrawler.pdfextract import PdfExtractResult +from sandcrawler.html_ingest import HtmlMetaRow class PersistCdxWorker(SandcrawlerWorker): @@ -95,8 +96,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not k in raw: self.counts['skip-request-fields'] += 1 return None - if raw['ingest_type'] not in ('pdf', 'xml'): - print(raw['ingest_type']) + if raw['ingest_type'] not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None request = { @@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): return request - def file_result_to_row(self, raw): + def file_result_to_row(self, raw: dict) -> Optional[dict]: """ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml'): + if ingest_type not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -159,6 +159,22 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') return result + def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]: + html_body = record.get('html_body') + file_meta = record.get('file_meta') + if not (file_meta and html_body): + return None + return HtmlMetaRow( + sha1hex=file_meta["sha1hex"], + status=record.get('status'), + scope=record.get('scope'), + has_teixml=bool(html_body and html_body['status'] == 'success' and html_body.get('tei_xml')), + has_thumbnail=False, # TODO + word_count=(html_body and html_body.get('word_count')) or None, + biblio=record.get('html_biblio'), + resources=record.get('html_resources'), + ) + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -197,6 +213,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] + html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] + if html_meta_batch: + resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing") + self.counts['insert-html_meta'] += resp[0] + self.counts['update-html_meta'] += resp[1] + self.db.commit() return [] @@ -452,9 +474,11 @@ class PersistPdfTextWorker(SandcrawlerWorker): class PersistThumbnailWorker(SandcrawlerWorker): """ - Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL + table. - This worker *must* be used with raw kakfa mode. + This worker *must* be used with raw kakfa mode; thumbnails are *not* + wrapped in JSON like most sandcrawler kafka messages. """ def __init__(self, **kwargs): @@ -487,3 +511,70 @@ class PersistThumbnailWorker(SandcrawlerWorker): ) self.counts['s3-put'] += 1 + +class GenericPersistDocWorker(SandcrawlerWorker): + """ + Pushes blobs from Kafka to S3. + + Objects are assumed to be JSON-wrapped strings. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".unknown") + self.s3_folder = kwargs.get('s3_folder', "unknown") + self.doc_key = "unknown" + + def process(self, record: dict, key: Optional[AnyStr] = None) -> None: + + if record.get('status') != 'success' or not record.get(self.doc_key): + return + + assert key is not None + if isinstance(key, bytes): + key_str = key.decode('utf-8') + elif isinstance(key, str): + key_str = key + assert len(key_str) == 40 + if 'sha1hex' in record: + assert key_str == record['sha1hex'] + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=record[self.doc_key].encode('utf-8'), + sha1hex=key_str, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class PersistXmlDocWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".jats.xml") + self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.doc_key = "jats_xml" + + +class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".tei.xml") + self.s3_folder = kwargs.get('s3_folder', "html_body") + self.doc_key = "tei_xml" diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py new file mode 100644 index 0000000..7a0086d --- /dev/null +++ b/python/sandcrawler/xml.py @@ -0,0 +1,7 @@ + +import xml.etree.ElementTree as ET + + +def xml_reserialize(raw: bytes) -> str: + root = ET.fromstring(raw) + return '<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(root, encoding="unicode") |