aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest_fileset.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-04 13:01:58 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-15 18:15:20 -0700
commite2e0602114ccdf142b3ef0f30c67d2cb7a58ef7e (patch)
treebee2b4343f30d59fec463dbdcaafafc11d7cd513 /python/sandcrawler/ingest_fileset.py
parent452475df7619f3743eac5ad86e2e1fb8ba9972da (diff)
downloadsandcrawler-e2e0602114ccdf142b3ef0f30c67d2cb7a58ef7e.tar.gz
sandcrawler-e2e0602114ccdf142b3ef0f30c67d2cb7a58ef7e.zip
progress on fileset/dataset ingest
Diffstat (limited to 'python/sandcrawler/ingest_fileset.py')
-rw-r--r--python/sandcrawler/ingest_fileset.py299
1 files changed, 299 insertions, 0 deletions
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
new file mode 100644
index 0000000..9ffaa47
--- /dev/null
+++ b/python/sandcrawler/ingest_fileset.py
@@ -0,0 +1,299 @@
+
+import sys
+import json
+import gzip
+import time
+from collections import namedtuple
+from typing import Optional, Tuple, Any, Dict, List
+
+import requests
+from selectolax.parser import HTMLParser
+
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError
+from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime
+from sandcrawler.html import extract_fulltext_url
+from sandcrawler.html_ingest import fetch_html_resources, \
+ quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \
+ WebResource, html_guess_platform
+
+from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules
+from sandcrawler.workers import SandcrawlerWorker
+from sandcrawler.db import SandcrawlerPostgrestClient
+
+
+MAX_BODY_SIZE_BYTES = 128*1024*1024
+
+class IngestFilesetWorker(IngestFileWorker):
+ """
+ General process is:
+
+ 1. crawl base_url, and use request and landing page resource (eg, HTML) to
+ determine platform being targeted
+ 2. use platform-specific helper to fetch metadata about the work, including
+ a manifest of files, and selection of an "ingest strategy" and any
+ required context
+ 3. then use strategy-specific helper to archive files from manifest (first
+ checking to see if content has been archived already)
+ 4. summarize status
+ """
+
+ def __init__(self, sink=None, **kwargs):
+ super().__init__(sink=None, **kwargs)
+
+ self.sink = sink
+
+
+ def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
+ """
+ Same as file version, but uses fileset result table
+ """
+ if not self.try_existing_ingest:
+ return None
+ existing = self.pgrest_client.get_ingest_fileset_result(ingest_type, base_url)
+ # TODO: filter on more flags?
+ if existing and existing['hit'] == True:
+ return existing
+ else:
+ return None
+
+ def process_existing(self, request: dict, result_row: dict) -> dict:
+ """
+ If we have an existing ingest fileset result, do any database fetches
+ or additional processing necessary to return a result.
+ """
+ raise NotImplementedError("process_existing() not tested or safe yet")
+
+ # XXX: use file version
+ #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
+
+ def want(self, request: dict) -> bool:
+ if not request.get('ingest_type') in ('dataset',):
+ return False
+ return True
+
+ def process(self, request: dict, key: Any = None) -> dict:
+
+ ingest_type = request.get('ingest_type')
+ if ingest_type not in ("dataset",):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
+
+ # parse/clean URL
+ # note that we pass through the original/raw URL, and that is what gets
+ # persisted in database table
+ base_url = clean_url(request['base_url'])
+
+ force_recrawl = bool(request.get('force_recrawl', False))
+
+ for block in self.base_url_blocklist:
+ if block in base_url:
+ print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+ return dict(request=request, hit=False, status="skip-url-blocklist")
+
+ print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+
+ # TODO
+ #existing = self.check_existing_ingest(ingest_type, base_url)
+ #if existing:
+ # return self.process_existing(request, existing)
+
+ result: Dict[str, Any] = dict(request=request, hit=False)
+ hops = [base_url]
+
+ # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
+
+ ### START COPYPASTA from process_file(), should refactor ###
+
+ # check against blocklist
+ for block in self.base_url_blocklist:
+ if block in next_url:
+ result['status'] = "skip-url-blocklist"
+ return result
+
+ try:
+ resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl)
+ except SavePageNowError as e:
+ result['status'] = 'spn2-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except PetaboxError as e:
+ result['status'] = 'petabox-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except CdxApiError as e:
+ result['status'] = 'cdx-error'
+ result['error_message'] = str(e)[:1600]
+ # add a sleep in cdx-error path as a slow-down
+ time.sleep(2.0)
+ return result
+ except WaybackError as e:
+ result['status'] = 'wayback-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except WaybackContentError as e:
+ result['status'] = 'wayback-content-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except NotImplementedError as e:
+ result['status'] = 'not-implemented'
+ result['error_message'] = str(e)[:1600]
+ return result
+
+ assert resource
+
+ if resource.terminal_url:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ }
+ if resource.terminal_url not in result['hops']:
+ result['hops'].append(resource.terminal_url)
+
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
+
+ if resource.terminal_url:
+ for pattern in self.base_url_blocklist:
+ if pattern in resource.terminal_url:
+ result['status'] = 'skip-url-blocklist'
+ return result
+
+ if resource.terminal_url:
+ for pattern in self.cookie_blocklist:
+ if pattern in resource.terminal_url:
+ result['status'] = 'blocked-cookie'
+ return result
+
+ if not resource.body:
+ result['status'] = 'null-body'
+ return result
+
+ if len(resource.body) > MAX_BODY_SIZE_BYTES:
+ result['status'] = 'body-too-large'
+ return result
+
+ file_meta = gen_file_metadata(resource.body)
+ try:
+ file_meta, resource = fix_transfer_encoding(file_meta, resource)
+ except Exception as e:
+ result['status'] = 'bad-gzip-encoding'
+ result['error_message'] = str(e)
+ return result
+
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
+ return result
+
+ # here we split based on ingest type to try and extract a next hop
+ html_ish_resource = bool(
+ "html" in file_meta['mimetype']
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "application/xml" in file_meta['mimetype']
+ or "text/xml" in file_meta['mimetype']
+ )
+ html_biblio = None
+ html_doc = None
+ if html_ish_resource and resource.body:
+ try:
+ html_doc = HTMLParser(resource.body)
+ html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
+ if html_biblio:
+ if not 'html_biblio' in result or html_biblio.title:
+ result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
+ except ValueError:
+ pass
+
+ # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
+ assert resource
+ assert resource.hit == True
+ assert resource.terminal_status_code in (200, 226)
+
+ if resource.terminal_url:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ "terminal_sha1hex": file_meta['sha1hex'],
+ }
+
+ result['file_meta'] = file_meta
+ result['cdx'] = cdx_to_dict(resource.cdx)
+ if resource.revisit_cdx:
+ result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
+
+ if ingest_type == "pdf":
+ if file_meta['mimetype'] != "application/pdf":
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+ elif ingest_type == "xml":
+ if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ elif ingest_type == "html":
+ if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ else:
+ raise NotImplementedError()
+
+ ### END COPYPASTA ###
+
+ # determine platform
+ platform_helper = None
+ for (helper_name, helper) in self.dataset_platform_helpers.items():
+ if helper.match_request(request, resource, html_biblio):
+ platform_helper = helper
+ break
+
+ if not platform_helper:
+ result['status'] = 'no-platform-match'
+ return result
+
+ # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`.
+ dataset_meta = platform_helper.process_request(request, resource.terminal_url, html_biblio)
+ platform = dataset_meta.platform_name
+ result['platform'] = dataset_meta.platform
+ result['platform_id'] = dataset_meta.platform_id
+ result['item_name'] = dataset_meta.item_name
+ if not dataset_meta.manifest:
+ result['status'] = 'no-manifest'
+ return result
+
+ result['manifest'] = dataset_meta.manifest or None
+ result['file_count'] = len(dataset_meta.manifest) or None
+ result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) or None
+
+ ingest_strategy = platform_helper.chose_strategy(dataset_meta)
+ result['ingest_strategy'] = ingest_strategy
+
+ strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy)
+ if not strategy_helper:
+ result['status'] = 'no-strategy-helper'
+ return result
+
+ # 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata.
+ archive_result = strategy_helper.process(dataset_meta)
+
+ # 4. Summarize status and return structured result metadata.
+ result['status'] = archive_result.status
+ result['manifest'] = archive_result.manifest
+ result['file_count'] = len(archive_result.manifest) or None
+ result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None
+
+ if result['status'] == 'success':
+ result['hit'] = True
+ print("[SUCCESS {:>5}] file_count={} total_size={}".format(
+ ingest_type,
+ result['file_count'],
+ result['total_size'],
+ ), file=sys.stderr)
+ else:
+ print("[FAIL {:>5}] status={} file_count={} total_size={}".format(
+ ingest_type,
+ result['status'],
+ result['file_count'],
+ result['total_size'],
+ ), file=sys.stderr)
+ return result