import sys import json import gzip import time from collections import namedtuple from typing import Optional, Tuple, Any, Dict, List import requests from selectolax.parser import HTMLParser from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime from sandcrawler.html import extract_fulltext_url from sandcrawler.ingest_html import fetch_html_resources, \ quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ WebResource, html_guess_platform from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.ingest_file import IngestFileWorker from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE MAX_BODY_SIZE_BYTES = 128*1024*1024 class IngestFilesetWorker(IngestFileWorker): """ General process is: 1. crawl base_url, and use request and landing page resource (eg, HTML) to determine platform being targeted 2. use platform-specific helper to fetch metadata about the work, including a manifest of files, and selection of an "ingest strategy" and any required context 3. then use strategy-specific helper to archive files from manifest (first checking to see if content has been archived already) 4. summarize status """ def __init__(self, sink=None, **kwargs): super().__init__(sink=None, **kwargs) self.sink = sink self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ Same as file version, but uses fileset result table """ if not self.try_existing_ingest: return None existing = self.pgrest_client.get_ingest_fileset_result(ingest_type, base_url) # TODO: filter on more flags? if existing and existing['hit'] == True: return existing else: return None def process_existing(self, request: dict, result_row: dict) -> dict: """ If we have an existing ingest fileset result, do any database fetches or additional processing necessary to return a result. """ raise NotImplementedError("process_existing() not tested or safe yet") # XXX: use file version #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: def want(self, request: dict) -> bool: if not request.get('ingest_type') in ('dataset',): return False return True def process(self, request: dict, key: Any = None) -> dict: ingest_type = request.get('ingest_type') if ingest_type not in ("dataset",): raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets # persisted in database table base_url = clean_url(request['base_url']) force_recrawl = bool(request.get('force_recrawl', False)) print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) # TODO: "existing" check for new fileset ingest result table #existing = self.check_existing_ingest(ingest_type, base_url) #if existing: # return self.process_existing(request, existing) result: Dict[str, Any] = dict(request=request, hit=False) result['hops'] = [base_url] next_url = base_url # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. ### START COPYPASTA from process_file(), should refactor ### # check against blocklist for block in self.base_url_blocklist: # XXX: hack to not skip archive.org content if 'archive.org' in block: continue if block in next_url: result['status'] = "skip-url-blocklist" return result try: resource = self.find_resource(next_url, force_recrawl=force_recrawl) except SavePageNowError as e: result['status'] = 'spn2-error' result['error_message'] = str(e)[:1600] return result except PetaboxError as e: result['status'] = 'petabox-error' result['error_message'] = str(e)[:1600] return result except CdxApiError as e: result['status'] = 'cdx-error' result['error_message'] = str(e)[:1600] # add a sleep in cdx-error path as a slow-down time.sleep(2.0) return result except WaybackError as e: result['status'] = 'wayback-error' result['error_message'] = str(e)[:1600] return result except WaybackContentError as e: result['status'] = 'wayback-content-error' result['error_message'] = str(e)[:1600] return result except NotImplementedError as e: #result['status'] = 'not-implemented' #result['error_message'] = str(e)[:1600] #return result resource = None html_biblio = None if resource: if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, } if resource.terminal_url not in result['hops']: result['hops'].append(resource.terminal_url) if not resource.hit: result['status'] = resource.status return result if resource.terminal_url: for pattern in self.base_url_blocklist: if pattern in resource.terminal_url: result['status'] = 'skip-url-blocklist' return result if resource.terminal_url: for pattern in self.cookie_blocklist: if pattern in resource.terminal_url: result['status'] = 'blocked-cookie' return result if not resource.body: result['status'] = 'null-body' return result if len(resource.body) > MAX_BODY_SIZE_BYTES: result['status'] = 'body-too-large' return result file_meta = gen_file_metadata(resource.body) try: file_meta, resource = fix_transfer_encoding(file_meta, resource) except Exception as e: result['status'] = 'bad-gzip-encoding' result['error_message'] = str(e) return result if not resource.body or file_meta['size_bytes'] == 0: result['status'] = 'null-body' return result # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" or "application/xml" in file_meta['mimetype'] or "text/xml" in file_meta['mimetype'] ) html_biblio = None html_doc = None if html_ish_resource and resource.body: try: html_doc = HTMLParser(resource.body) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: if not 'html_biblio' in result or html_biblio.title: result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass # fetch must be a hit if we got this far (though not necessarily an ingest hit!) assert resource assert resource.hit == True assert resource.terminal_status_code in (200, 226) if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, "terminal_sha1hex": file_meta['sha1hex'], } result['file_meta'] = file_meta result['cdx'] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) if ingest_type == "pdf": if file_meta['mimetype'] != "application/pdf": result['status'] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): result['status'] = "wrong-mimetype" return result elif ingest_type == "html": if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): result['status'] = "wrong-mimetype" return result else: #raise NotImplementedError() pass ### END COPYPASTA ### # XXX: html_guess_platform() # determine platform platform_helper = None for (helper_name, helper) in self.dataset_platform_helpers.items(): if helper.match_request(request, resource, html_biblio): platform_helper = helper break if not platform_helper: result['status'] = 'no-platform-match' return result # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`. terminal_url = base_url if resource: terminal_url = resource.terminal_url dataset_meta = platform_helper.process_request(request, resource, html_biblio) #print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name result['platform'] = dataset_meta.platform_name result['platform_id'] = dataset_meta.platform_id result['item_name'] = dataset_meta.archiveorg_item_name result['item_meta'] = dataset_meta.archiveorg_item_meta if dataset_meta.manifest: result['manifest'] = [m.dict() for m in dataset_meta.manifest] result['file_count'] = len(dataset_meta.manifest) result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) else: result['status'] = 'no-manifest' return result ingest_strategy = platform_helper.chose_strategy(dataset_meta) result['ingest_strategy'] = ingest_strategy print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr) strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) if not strategy_helper: result['status'] = 'no-strategy-helper' return result # 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata. archive_result = strategy_helper.process(dataset_meta) # 4. Summarize status and return structured result metadata. result['status'] = archive_result.status result['manifest'] = [m.dict() for m in archive_result.manifest] result['file_count'] = len(archive_result.manifest) or None result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None if result['status'].startswith('success'): result['hit'] = True print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( ingest_type, result['file_count'], result['total_size'], ingest_strategy, ), file=sys.stderr) else: print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( ingest_type, result['status'], result['file_count'], result['total_size'], ingest_strategy, ), file=sys.stderr) return result