From e2e0602114ccdf142b3ef0f30c67d2cb7a58ef7e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 4 Oct 2021 13:01:58 -0700 Subject: progress on fileset/dataset ingest --- python/sandcrawler/ingest_fileset.py | 299 +++++++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 python/sandcrawler/ingest_fileset.py (limited to 'python/sandcrawler/ingest_fileset.py') diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py new file mode 100644 index 0000000..9ffaa47 --- /dev/null +++ b/python/sandcrawler/ingest_fileset.py @@ -0,0 +1,299 @@ + +import sys +import json +import gzip +import time +from collections import namedtuple +from typing import Optional, Tuple, Any, Dict, List + +import requests +from selectolax.parser import HTMLParser + +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError +from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime +from sandcrawler.html import extract_fulltext_url +from sandcrawler.html_ingest import fetch_html_resources, \ + quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ + WebResource, html_guess_platform + +from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgrestClient + + +MAX_BODY_SIZE_BYTES = 128*1024*1024 + +class IngestFilesetWorker(IngestFileWorker): + """ + General process is: + + 1. crawl base_url, and use request and landing page resource (eg, HTML) to + determine platform being targeted + 2. use platform-specific helper to fetch metadata about the work, including + a manifest of files, and selection of an "ingest strategy" and any + required context + 3. then use strategy-specific helper to archive files from manifest (first + checking to see if content has been archived already) + 4. summarize status + """ + + def __init__(self, sink=None, **kwargs): + super().__init__(sink=None, **kwargs) + + self.sink = sink + + + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: + """ + Same as file version, but uses fileset result table + """ + if not self.try_existing_ingest: + return None + existing = self.pgrest_client.get_ingest_fileset_result(ingest_type, base_url) + # TODO: filter on more flags? + if existing and existing['hit'] == True: + return existing + else: + return None + + def process_existing(self, request: dict, result_row: dict) -> dict: + """ + If we have an existing ingest fileset result, do any database fetches + or additional processing necessary to return a result. + """ + raise NotImplementedError("process_existing() not tested or safe yet") + + # XXX: use file version + #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: + + def want(self, request: dict) -> bool: + if not request.get('ingest_type') in ('dataset',): + return False + return True + + def process(self, request: dict, key: Any = None) -> dict: + + ingest_type = request.get('ingest_type') + if ingest_type not in ("dataset",): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + + # parse/clean URL + # note that we pass through the original/raw URL, and that is what gets + # persisted in database table + base_url = clean_url(request['base_url']) + + force_recrawl = bool(request.get('force_recrawl', False)) + + for block in self.base_url_blocklist: + if block in base_url: + print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + return dict(request=request, hit=False, status="skip-url-blocklist") + + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + + # TODO + #existing = self.check_existing_ingest(ingest_type, base_url) + #if existing: + # return self.process_existing(request, existing) + + result: Dict[str, Any] = dict(request=request, hit=False) + hops = [base_url] + + # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. + + ### START COPYPASTA from process_file(), should refactor ### + + # check against blocklist + for block in self.base_url_blocklist: + if block in next_url: + result['status'] = "skip-url-blocklist" + return result + + try: + resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) + except SavePageNowError as e: + result['status'] = 'spn2-error' + result['error_message'] = str(e)[:1600] + return result + except PetaboxError as e: + result['status'] = 'petabox-error' + result['error_message'] = str(e)[:1600] + return result + except CdxApiError as e: + result['status'] = 'cdx-error' + result['error_message'] = str(e)[:1600] + # add a sleep in cdx-error path as a slow-down + time.sleep(2.0) + return result + except WaybackError as e: + result['status'] = 'wayback-error' + result['error_message'] = str(e)[:1600] + return result + except WaybackContentError as e: + result['status'] = 'wayback-content-error' + result['error_message'] = str(e)[:1600] + return result + except NotImplementedError as e: + result['status'] = 'not-implemented' + result['error_message'] = str(e)[:1600] + return result + + assert resource + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + if resource.terminal_url not in result['hops']: + result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status + return result + + if resource.terminal_url: + for pattern in self.base_url_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'skip-url-blocklist' + return result + + if resource.terminal_url: + for pattern in self.cookie_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'blocked-cookie' + return result + + if not resource.body: + result['status'] = 'null-body' + return result + + if len(resource.body) > MAX_BODY_SIZE_BYTES: + result['status'] = 'body-too-large' + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) + return result + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + html_biblio = None + html_doc = None + if html_ish_resource and resource.body: + try: + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + if html_biblio: + if not 'html_biblio' in result or html_biblio.title: + result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + except ValueError: + pass + + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + "terminal_sha1hex": file_meta['sha1hex'], + } + + result['file_meta'] = file_meta + result['cdx'] = cdx_to_dict(resource.cdx) + if resource.revisit_cdx: + result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + + if ingest_type == "pdf": + if file_meta['mimetype'] != "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): + result['status'] = "wrong-mimetype" + return result + else: + raise NotImplementedError() + + ### END COPYPASTA ### + + # determine platform + platform_helper = None + for (helper_name, helper) in self.dataset_platform_helpers.items(): + if helper.match_request(request, resource, html_biblio): + platform_helper = helper + break + + if not platform_helper: + result['status'] = 'no-platform-match' + return result + + # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`. + dataset_meta = platform_helper.process_request(request, resource.terminal_url, html_biblio) + platform = dataset_meta.platform_name + result['platform'] = dataset_meta.platform + result['platform_id'] = dataset_meta.platform_id + result['item_name'] = dataset_meta.item_name + if not dataset_meta.manifest: + result['status'] = 'no-manifest' + return result + + result['manifest'] = dataset_meta.manifest or None + result['file_count'] = len(dataset_meta.manifest) or None + result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) or None + + ingest_strategy = platform_helper.chose_strategy(dataset_meta) + result['ingest_strategy'] = ingest_strategy + + strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) + if not strategy_helper: + result['status'] = 'no-strategy-helper' + return result + + # 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata. + archive_result = strategy_helper.process(dataset_meta) + + # 4. Summarize status and return structured result metadata. + result['status'] = archive_result.status + result['manifest'] = archive_result.manifest + result['file_count'] = len(archive_result.manifest) or None + result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None + + if result['status'] == 'success': + result['hit'] = True + print("[SUCCESS {:>5}] file_count={} total_size={}".format( + ingest_type, + result['file_count'], + result['total_size'], + ), file=sys.stderr) + else: + print("[FAIL {:>5}] status={} file_count={} total_size={}".format( + ingest_type, + result['status'], + result['file_count'], + result['total_size'], + ), file=sys.stderr) + return result -- cgit v1.2.3