diff options
Diffstat (limited to 'python/sandcrawler/ingest.py')
-rw-r--r-- | python/sandcrawler/ingest.py | 367 |
1 files changed, 257 insertions, 110 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 6d8b162..0c8eee6 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -5,16 +5,25 @@ import gzip import time import base64 import requests +from typing import Optional, Tuple, Any, Dict, List from http.server import BaseHTTPRequestHandler, HTTPServer from collections import namedtuple +from selectolax.parser import HTMLParser -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding from sandcrawler.grobid import GrobidClient from sandcrawler.pdfextract import process_pdf, PdfExtractResult -from sandcrawler.misc import gen_file_metadata, clean_url +from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime from sandcrawler.html import extract_fulltext_url +from sandcrawler.html_ingest import fetch_html_resources, \ + quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ + WebResource +from sandcrawler.html_metadata import html_extract_fulltext_url, \ + XML_FULLTEXT_PATTERNS, HTML_FULLTEXT_PATTERNS, BiblioMetadata, \ + html_extract_resources, html_extract_biblio, load_adblock_rules from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient +from sandcrawler.xml import xml_reserialize class IngestFileWorker(SandcrawlerWorker): @@ -46,7 +55,7 @@ class IngestFileWorker(SandcrawlerWorker): def __init__(self, sink=None, **kwargs): super().__init__() - + self.sink = sink self.wayback_client = kwargs.get('wayback_client') if not self.wayback_client: @@ -63,12 +72,18 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_sink = kwargs.get('grobid_sink') self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') + self.htmlteixml_sink = kwargs.get('htmlteixml_sink') + self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) self.try_existing_grobid = kwargs.get('try_existing_grobid', True) self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) self.try_wayback = kwargs.get('try_wayback', True) self.try_spn2 = kwargs.get('try_spn2', True) + self.html_quick_mode = False + self.adblock_rules = load_adblock_rules() + self.max_html_resources = 200 self.base_url_blocklist = [ # robot blocking @@ -76,8 +91,10 @@ class IngestFileWorker(SandcrawlerWorker): # temporary, until we implement specific fetch and 'petabox' output "://archive.org/", + "://www.archive.org/", "://web.archive.org/web/", "://openlibrary.org/", + "://www.openlibrary.org/", "://fatcat.wiki/", # Domain squats @@ -135,7 +152,7 @@ class IngestFileWorker(SandcrawlerWorker): ] - def check_existing_ingest(self, base_url): + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ Check in sandcrawler-db (postgres) to see if we have already ingested this URL (ingest file result table). @@ -147,14 +164,14 @@ class IngestFileWorker(SandcrawlerWorker): """ if not self.try_existing_ingest: return None - existing = self.pgrest_client.get_ingest_file_result(base_url) + existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) # TODO: filter on more flags? if existing and existing['hit'] == True: return existing else: return None - def find_resource(self, url, best_mimetype=None, force_recrawl=False): + def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: """ Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. @@ -183,7 +200,7 @@ class IngestFileWorker(SandcrawlerWorker): if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': old_failure = True - if self.try_spn2 and (resource == None or (resource.status == 'no-capture') or soft404 or old_failure): + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): via = "spn2" force_simple_get = 0 for domain in self.spn2_simple_get_domains: @@ -191,14 +208,14 @@ class IngestFileWorker(SandcrawlerWorker): force_simple_get = 1 break resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get) - print("[FETCH {}\t] {}\t{}".format( + print("[FETCH {:>6}] {} {}".format( via, - resource.status, - resource.terminal_url or url), + (resource and resource.status), + (resource and resource.terminal_url) or url), file=sys.stderr) return resource - def process_existing(self, request, result_row): + def process_existing(self, request: dict, result_row: dict) -> dict: """ If we have an existing ingest file result, do any database fetches or additional processing necessary to return a result. @@ -226,16 +243,25 @@ class IngestFileWorker(SandcrawlerWorker): } return result - def process_hit(self, resource, file_meta): + def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: """ Run all the necessary processing for a new/fresh ingest hit. """ - return { - 'grobid': self.process_grobid(resource, file_meta), - 'pdf_meta': self.process_pdfextract(resource, file_meta), - } + if ingest_type == "pdf": + return { + 'grobid': self.process_grobid(resource, file_meta), + 'pdf_meta': self.process_pdfextract(resource, file_meta), + } + elif ingest_type == "xml": + return { + 'xml_meta': self.process_xml(resource, file_meta), + } + elif ingest_type == "html": + return self.process_html(resource, file_meta) + else: + raise NotImplementedError(f"process {ingest_type} hit") - def process_grobid(self, resource, file_meta): + def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: """ Submits to resource body to GROBID for processing. @@ -266,7 +292,7 @@ class IngestFileWorker(SandcrawlerWorker): result.pop('key', None) return result - def process_pdfextract(self, resource, file_meta): + def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: """ Extracts thumbnail and pdf_meta info from PDF. @@ -288,13 +314,99 @@ class IngestFileWorker(SandcrawlerWorker): if self.thumbnail_sink and result.page0_thumbnail is not None: self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) if self.pdftext_sink: - self.pdftext_sink.push_record(result.to_pdftext_dict()) + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) result.page0_thumbnail = None result.text = None result.file_meta = None return result.to_pdftext_dict() - def timeout_response(self, task): + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + jats_xml = xml_reserialize(resource.body) + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=jats_xml, + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + + def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: + + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + assert html_biblio + html_body = html_extract_body_teixml(resource.body) + html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) + html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) + + if html_scope not in ('article-fulltext', 'unknown'): + html_body.pop("tei_xml", None) + return dict( + status="html-body-wrong-scope", + html_biblio=html_biblio_dict, + html_scope=html_scope, + ) + + raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) + if len(raw_resources) > self.max_html_resources: + html_body.pop("tei_xml", None) + return dict( + status="too-many-resources", + html_biblio=html_biblio_dict, + html_scope=html_scope, + ) + + when = parse_cdx_datetime(resource.cdx.datetime) + + full_resources: List[WebResource] = [] + + partial_result = dict( + html_biblio=html_biblio_dict, + html_scope=html_scope, + ) + + try: + if self.html_quick_mode: + full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) + except PetaboxError as e: + partial_result['status'] = 'petabox-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except CdxApiError as e: + partial_result['status'] = 'cdx-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackError as e: + partial_result['status'] = 'wayback-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackContentError as e: + partial_result['status'] = 'wayback-content-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + + if self.htmlteixml_sink and html_body['status'] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) + + html_body.pop("tei_xml", None) + + return dict( + html_body=html_body, + html_biblio=html_biblio_dict, + scope=html_scope, + html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], + ) + + def timeout_response(self, task: dict) -> dict: print("[TIMEOUT]", file=sys.stderr) return dict( request=task, @@ -303,22 +415,20 @@ class IngestFileWorker(SandcrawlerWorker): error_message="ingest worker internal timeout", ) - def want(self, request): - if not request.get('ingest_type') in ('file', 'pdf'): + def want(self, request: dict) -> bool: + if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html'): return False return True - def process(self, request, key=None): + def process(self, request: dict, key: Any = None) -> dict: - # backwards compatibility - if request.get('ingest_type') in ('file', None): + # old backwards compatibility + if request.get('ingest_type') == 'file': request['ingest_type'] = 'pdf' - # for now, only pdf ingest is implemented - if not 'ingest_type' in request: - request['ingest_type'] = "pdf" - assert request.get('ingest_type') == "pdf" ingest_type = request.get('ingest_type') + if ingest_type not in ("pdf", "xml", "html"): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets @@ -329,25 +439,27 @@ class IngestFileWorker(SandcrawlerWorker): for block in self.base_url_blocklist: if block in base_url: - print("[SKIP {}\t] {}".format(ingest_type, base_url), file=sys.stderr) + print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) return dict(request=request, hit=False, status="skip-url-blocklist") - print("[INGEST {}\t] {}".format(ingest_type, base_url), file=sys.stderr) + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) best_mimetype = None if ingest_type == "pdf": best_mimetype = "application/pdf" + elif ingest_type == "xml": + best_mimetype = "text/xml" + elif ingest_type == "html": + best_mimetype = "text/html" - existing = self.check_existing_ingest(base_url) + existing = self.check_existing_ingest(ingest_type, base_url) if existing: return self.process_existing(request, existing) - result = dict(request=request, hit=False) + result: Dict[str, Any] = dict(request=request, hit=False) next_url = base_url hops = [base_url] - self.max_hops = 6 - while len(hops) <= self.max_hops: @@ -400,25 +512,9 @@ class IngestFileWorker(SandcrawlerWorker): result['error_message'] = str(e)[:1600] return result - if not resource.hit: - result['status'] = resource.status - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) - return result - - if not resource.body: - result['status'] = 'null-body' - return result - file_meta = gen_file_metadata(resource.body) + assert resource - if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url): - result['status'] = 'blocked-cookie' + if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, @@ -426,53 +522,47 @@ class IngestFileWorker(SandcrawlerWorker): } if resource.terminal_url not in result['hops']: result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status return result - # crude handling of content-encoding; wayback fetch library usually - # (and should always?) handle this - if file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print("transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) - try: - inner_body = gzip.decompress(resource.body) - except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) - return result - if not inner_body: - result['status'] = 'null-body' - return result - resource = ResourceResult( - body=inner_body, - # copy all other fields - start_url=resource.start_url, - hit=resource.hit, - status=resource.status, - terminal_url=resource.terminal_url, - terminal_dt=resource.terminal_dt, - terminal_status_code=resource.terminal_status_code, - cdx=resource.cdx, - revisit_cdx=resource.revisit_cdx, - ) - file_meta = gen_file_metadata(resource.body) - - if "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] or "application/xml" in file_meta['mimetype'] or "text/xml" in file_meta['mimetype']: + if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url): + result['status'] = 'blocked-cookie' + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) + return result + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + if ingest_type == "pdf" and html_ish_resource: # Got landing page or similar. Some XHTML detected as "application/xml" - if resource.terminal_dt: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) - - result['html'] = fulltext_url + result['extract_next_hop'] = fulltext_url + if not fulltext_url: result['status'] = 'no-pdf-link' return result next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') assert next_url next_url = clean_url(next_url) - print("[PARSE\t] {}\t{}".format( + print("[PARSE {:>6}] {} {}".format( + ingest_type, fulltext_url.get('technique'), next_url, ), @@ -483,7 +573,44 @@ class IngestFileWorker(SandcrawlerWorker): return result hops.append(next_url) continue - + elif ingest_type == "xml" and html_ish_resource: + # parse with selectolax, extract XML fulltext URL + html_doc = HTMLParser(resource.body) + extract_next_hop = html_extract_fulltext_url(resource.terminal_url, html_doc, XML_FULLTEXT_PATTERNS) + if extract_next_hop: + next_url = extract_next_hop[0] + technique = extract_next_hop[1] + print("[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr) + if next_url in hops: + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + elif ingest_type == "html" and html_ish_resource: + # parse with selectolax, extract XML fulltext URL + html_doc = HTMLParser(resource.body) + extract_next_hop = html_extract_fulltext_url(resource.terminal_url, html_doc, HTML_FULLTEXT_PATTERNS) + if extract_next_hop: + next_url = extract_next_hop[0] + technique = extract_next_hop[1] + if next_url in hops: + # for HTML ingest, we don't count this as a link-loop + break + print("[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr) + hops.append(next_url) + continue + # default is to NOT keep hopping break @@ -491,6 +618,11 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "max-hops-exceeded" return result + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, @@ -499,35 +631,50 @@ class IngestFileWorker(SandcrawlerWorker): "terminal_sha1hex": file_meta['sha1hex'], } - # fetch must be a hit if we got this far (though not necessarily an ingest hit!) - assert resource.hit == True - assert resource.terminal_status_code in (200, 226) - result['file_meta'] = file_meta result['cdx'] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) - # other failure cases - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' - return result - - if not (resource.hit and file_meta['mimetype'] == "application/pdf"): - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" - return result + if ingest_type == "pdf": + if file_meta['mimetype'] != "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta['mimetype'] not in ("text/html",): + result['status'] = "wrong-mimetype" + return result + else: + raise NotImplementedError() - info = self.process_hit(resource, file_meta) + info = self.process_hit(ingest_type, resource, file_meta) result.update(info) + # check if processing turned up an error + if info.get('status') not in ('success', None): + result['status'] = info['status'] + return result + result['status'] = "success" result['hit'] = True - print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format( - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) + if ingest_type == "pdf": + print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) + else: + print("[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) return result |