aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest_fileset.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/ingest_fileset.py')
-rw-r--r--python/sandcrawler/ingest_fileset.py516
1 files changed, 516 insertions, 0 deletions
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
new file mode 100644
index 0000000..3acbece
--- /dev/null
+++ b/python/sandcrawler/ingest_fileset.py
@@ -0,0 +1,516 @@
+import json
+import sys
+import time
+from typing import Any, Dict, Optional
+
+import requests
+from selectolax.parser import HTMLParser
+
+from sandcrawler.fileset_platforms import (
+ ArchiveOrgHelper,
+ DataverseHelper,
+ FigshareHelper,
+ ZenodoHelper,
+)
+from sandcrawler.fileset_strategies import (
+ ArchiveorgFilesetStrategy,
+ ArchiveorgFileStrategy,
+ WebFilesetStrategy,
+ WebFileStrategy,
+)
+from sandcrawler.fileset_types import (
+ IngestStrategy,
+ PlatformRestrictedError,
+ PlatformScopeError,
+)
+from sandcrawler.html_metadata import html_extract_biblio
+from sandcrawler.ia import (
+ CdxApiError,
+ PetaboxError,
+ SavePageNowError,
+ WaybackContentError,
+ WaybackError,
+ cdx_to_dict,
+ fix_transfer_encoding,
+)
+from sandcrawler.ingest_file import IngestFileWorker
+from sandcrawler.misc import clean_url, gen_file_metadata
+from sandcrawler.workers import SandcrawlerWorker
+
+MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024
+
+
+class IngestFilesetWorker(IngestFileWorker):
+ """
+ General process is:
+
+ 1. crawl base_url, and use request and landing page resource (eg, HTML) to
+ determine platform being targeted
+ 2. use platform-specific helper to fetch metadata about the work, including
+ a manifest of files, and selection of an "ingest strategy" and any
+ required context
+ 3. then use strategy-specific helper to archive files from manifest (first
+ checking to see if content has been archived already)
+ 4. summarize status
+ """
+
+ def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs):
+ super().__init__(sink=None, **kwargs)
+
+ self.try_spn2 = kwargs.get("try_spn2", True)
+ self.sink = sink
+ self.dataset_platform_helpers = {
+ "dataverse": DataverseHelper(),
+ "figshare": FigshareHelper(),
+ "zenodo": ZenodoHelper(),
+ "archiveorg": ArchiveOrgHelper(),
+ }
+ self.dataset_strategy_archivers = {
+ IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(),
+ IngestStrategy.ArchiveorgFile: ArchiveorgFileStrategy(),
+ IngestStrategy.WebFileset: WebFilesetStrategy(try_spn2=self.try_spn2),
+ IngestStrategy.WebFile: WebFileStrategy(try_spn2=self.try_spn2),
+ }
+
+ self.max_total_size = kwargs.get("max_total_size", 64 * 1024 * 1024 * 1024)
+ self.max_file_count = kwargs.get("max_file_count", 200)
+ self.ingest_file_result_sink = kwargs.get("ingest_file_result_sink")
+ self.ingest_file_result_stdout = kwargs.get("ingest_file_result_stdout", False)
+
+ def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
+ """
+ Same as file version, but uses fileset result table
+ """
+ if not self.try_existing_ingest:
+ return None
+ existing = self.pgrest_client.get_ingest_fileset_platform(ingest_type, base_url)
+ # TODO: filter on more flags?
+ if existing and existing["hit"] is True:
+ return existing
+ else:
+ return None
+
+ def process_existing(self, request: dict, result_row: dict) -> dict:
+ """
+ If we have an existing ingest fileset result, do any database fetches
+ or additional processing necessary to return a result.
+ """
+ raise NotImplementedError("process_existing() not tested or safe yet")
+
+ def want(self, request: dict) -> bool:
+ if not request.get("ingest_type") in ("dataset",):
+ return False
+ return True
+
+ def fetch_resource_iteratively(
+ self, ingest_type: str, base_url: str, force_recrawl: bool
+ ) -> dict:
+ """
+ This is copypasta from process_file(), should probably refactor.
+ """
+
+ result: Dict[str, Any] = dict(hit=False)
+ result["hops"] = [base_url]
+ next_url = base_url
+
+ # check against blocklist
+ for block in self.base_url_blocklist:
+ # NOTE: hack to not skip archive.org content
+ if "archive.org" in block:
+ continue
+ if block in next_url:
+ result["status"] = "skip-url-blocklist"
+ return result
+
+ try:
+ resource = self.find_resource(next_url, force_recrawl=force_recrawl)
+ except SavePageNowError as e:
+ result["status"] = "spn2-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except PetaboxError as e:
+ result["status"] = "petabox-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except CdxApiError as e:
+ result["status"] = "cdx-error"
+ result["error_message"] = str(e)[:1600]
+ # add a sleep in cdx-error path as a slow-down
+ time.sleep(2.0)
+ return result
+ except WaybackError as e:
+ result["status"] = "wayback-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except WaybackContentError as e:
+ result["status"] = "wayback-content-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except NotImplementedError as e:
+ result["status"] = "not-implemented"
+ result["error_message"] = str(e)[:1600]
+ return result
+
+ html_biblio = None
+ if resource:
+ if resource.terminal_url:
+ result["terminal"] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ }
+ if resource.terminal_url not in result["hops"]:
+ result["hops"].append(resource.terminal_url)
+
+ if not resource.hit:
+ result["status"] = resource.status
+ return result
+
+ if resource.terminal_url:
+ for pattern in self.base_url_blocklist:
+ if pattern in resource.terminal_url:
+ result["status"] = "skip-url-blocklist"
+ return result
+
+ if resource.terminal_url:
+ for pattern in self.cookie_blocklist:
+ if pattern in resource.terminal_url:
+ result["status"] = "blocked-cookie"
+ return result
+
+ if not resource.body:
+ result["status"] = "empty-blob"
+ return result
+
+ if len(resource.body) > MAX_BODY_SIZE_BYTES:
+ result["status"] = "body-too-large"
+ return result
+
+ file_meta = gen_file_metadata(resource.body)
+ try:
+ file_meta, resource = fix_transfer_encoding(file_meta, resource)
+ except Exception as e:
+ result["status"] = "bad-gzip-encoding"
+ result["error_message"] = str(e)
+ return result
+
+ if not resource.body or file_meta["size_bytes"] == 0:
+ result["status"] = "empty-blob"
+ return result
+
+ # here we split based on ingest type to try and extract a next hop
+ html_ish_resource = bool(
+ "html" in file_meta["mimetype"]
+ or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml"
+ or "application/xml" in file_meta["mimetype"]
+ or "text/xml" in file_meta["mimetype"]
+ )
+ html_biblio = None
+ html_doc = None
+ if html_ish_resource and resource.body:
+ try:
+ html_doc = HTMLParser(resource.body)
+ html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
+ if html_biblio:
+ if "html_biblio" not in result and html_biblio.title:
+ result["html_biblio"] = json.loads(
+ html_biblio.json(exclude_none=True)
+ )
+ # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
+ except ValueError:
+ pass
+
+ # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
+ assert resource
+ assert resource.hit is True
+ assert resource.terminal_status_code in (200, 226)
+
+ if resource.terminal_url:
+ result["terminal"] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ "terminal_sha1hex": file_meta["sha1hex"],
+ }
+
+ result["file_meta"] = file_meta
+ result["cdx"] = cdx_to_dict(resource.cdx)
+ if resource.revisit_cdx:
+ result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx)
+
+ if ingest_type == "pdf":
+ if file_meta["mimetype"] != "application/pdf":
+ result["status"] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+ elif ingest_type == "xml":
+ if file_meta["mimetype"] not in (
+ "application/xml",
+ "text/xml",
+ "application/jats+xml",
+ ):
+ result["status"] = "wrong-mimetype"
+ return result
+ elif ingest_type == "html":
+ if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"):
+ result["status"] = "wrong-mimetype"
+ return result
+ else:
+ # eg, datasets, components, etc
+ pass
+
+ result["_html_biblio"] = html_biblio
+ result["_resource"] = resource
+ return result
+
+ def process(self, request: dict, key: Any = None) -> dict:
+
+ ingest_type = request.get("ingest_type")
+ if ingest_type not in ("dataset",):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
+
+ # parse/clean URL
+ # note that we pass through the original/raw URL, and that is what gets
+ # persisted in database table
+ base_url = clean_url(request["base_url"])
+
+ force_recrawl = bool(request.get("force_recrawl", False))
+
+ print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+
+ # TODO: "existing" check against file and/or fileset ingest result table
+ # existing = self.check_existing_ingest(ingest_type, base_url)
+ # if existing:
+ # return self.process_existing(request, existing)
+
+ result = self.fetch_resource_iteratively(
+ ingest_type, base_url, force_recrawl=force_recrawl
+ )
+ result["request"] = request
+ if result.get("status") is not None:
+ result["request"] = request
+ return result
+
+ html_biblio = result.pop("_html_biblio")
+ resource = result.pop("_resource")
+
+ # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
+
+ # TODO: could involve html_guess_platform() here?
+
+ # determine platform
+ platform_helper = None
+ for (helper_name, helper) in self.dataset_platform_helpers.items():
+ if helper.match_request(request, resource, html_biblio):
+ platform_helper = helper
+ break
+
+ if not platform_helper:
+ result["status"] = "no-platform-match"
+ return result
+
+ # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`.
+ try:
+ dataset_meta = platform_helper.process_request(request, resource, html_biblio)
+ except PlatformScopeError as e:
+ result["status"] = "platform-scope"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except PlatformRestrictedError as e:
+ result["status"] = "platform-restricted"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except NotImplementedError as e:
+ result["status"] = "not-implemented"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except requests.exceptions.HTTPError as e:
+ result["error_message"] = str(e)[:1600]
+ if e.response.status_code == 404:
+ result["status"] = "platform-404"
+ result["error_message"] = str(e)[:1600]
+ return result
+ else:
+ result["status"] = "platform-http-error"
+ return result
+ except requests.exceptions.RequestException as e:
+ result["error_message"] = str(e)[:1600]
+ result["status"] = "platform-error"
+ return result
+
+ # print(dataset_meta, file=sys.stderr)
+ platform = dataset_meta.platform_name
+ result["platform_name"] = dataset_meta.platform_name
+ result["platform_domain"] = dataset_meta.platform_domain
+ result["platform_id"] = dataset_meta.platform_id
+ result["platform_base_url"] = dataset_meta.web_base_url
+ result["archiveorg_item_name"] = dataset_meta.archiveorg_item_name
+
+ if not dataset_meta.manifest:
+ result["status"] = "empty-manifest"
+ return result
+
+ # these will get confirmed/updated after ingest
+ result["manifest"] = [m.dict(exclude_none=True) for m in dataset_meta.manifest]
+ result["file_count"] = len(dataset_meta.manifest)
+ result["total_size"] = sum([m.size for m in dataset_meta.manifest if m.size])
+
+ if result["total_size"] > self.max_total_size:
+ result["status"] = "too-large-size"
+ return result
+ if result["file_count"] > self.max_file_count:
+ # hard max, to prevent downstream breakage
+ if result["file_count"] > 10 * 1000:
+ result["manifest"] = result["manifest"][: self.max_file_count]
+ result["status"] = "too-many-files"
+ return result
+
+ ingest_strategy = platform_helper.chose_strategy(dataset_meta)
+ result["ingest_strategy"] = ingest_strategy
+ print(
+ f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}",
+ file=sys.stderr,
+ )
+
+ strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy)
+ if not strategy_helper:
+ result["status"] = "no-strategy-helper"
+ return result
+
+ # 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata.
+ try:
+ archive_result = strategy_helper.process(dataset_meta)
+ except SavePageNowError as e:
+ result["status"] = "spn2-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except PetaboxError as e:
+ result["status"] = "petabox-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except CdxApiError as e:
+ result["status"] = "cdx-error"
+ result["error_message"] = str(e)[:1600]
+ # add a sleep in cdx-error path as a slow-down
+ time.sleep(2.0)
+ return result
+ except WaybackError as e:
+ result["status"] = "wayback-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+ except WaybackContentError as e:
+ result["status"] = "wayback-content-error"
+ result["error_message"] = str(e)[:1600]
+ return result
+
+ # 4. Summarize status and return structured result metadata.
+ result["status"] = archive_result.status
+ result["manifest"] = [m.dict(exclude_none=True) for m in archive_result.manifest]
+
+ if ingest_strategy.endswith("-fileset-bundle"):
+ result["fileset_bundle"] = dict()
+ if archive_result.bundle_file_meta:
+ result["fileset_bundle"]["file_meta"] = archive_result.bundle_file_meta
+ if archive_result.bundle_archiveorg_path:
+ result["fileset_bundle"][
+ "archiveorg_bundle_path"
+ ] = archive_result.bundle_archiveorg_path
+ if archive_result.bundle_resource:
+ result["fileset_bundle"]["terminal"] = dict(
+ terminal_url=archive_result.bundle_resource.terminal_url,
+ terminal_dt=archive_result.bundle_resource.terminal_dt,
+ terminal_status_code=archive_result.bundle_resource.terminal_status_code,
+ )
+ if archive_result.bundle_resource.cdx:
+ result["fileset_bundle"]["cdx"] = cdx_to_dict(
+ archive_result.bundle_resource.cdx
+ )
+ if archive_result.bundle_resource.revisit_cdx:
+ result["fileset_bundle"]["revisit_cdx"] = cdx_to_dict(
+ archive_result.bundle_resource.revisit_cdx
+ )
+
+ if ingest_strategy.endswith("-file"):
+ result["fileset_file"] = dict()
+ if archive_result.file_file_meta:
+ result["fileset_file"]["file_meta"] = (archive_result.file_file_meta,)
+ if archive_result.file_resource:
+ result["fileset_file"]["terminal"] = dict(
+ terminal_url=archive_result.file_resource.terminal_url,
+ terminal_dt=archive_result.file_resource.terminal_dt,
+ terminal_status_code=archive_result.file_resource.terminal_status_code,
+ )
+ if archive_result.file_resource.cdx:
+ result["fileset_file"]["cdx"] = cdx_to_dict(
+ archive_result.file_resource.cdx
+ )
+ if archive_result.file_resource.revisit_cdx:
+ result["fileset_file"]["revisit_cdx"] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx
+ )
+
+ if result["status"].startswith("success"):
+ # check that these are still valid
+ assert result["file_count"] == len(archive_result.manifest)
+ assert result["total_size"] == sum(
+ [m.size for m in archive_result.manifest if m.size]
+ )
+
+ if (
+ result["status"] == "success-file"
+ and archive_result.file_resource
+ and archive_result.file_file_meta
+ ):
+ file_result: Dict[str, Any] = dict(
+ hit=True,
+ status="success",
+ request=request.copy(),
+ file_meta=archive_result.file_file_meta,
+ terminal=dict(
+ terminal_url=archive_result.file_resource.terminal_url,
+ terminal_dt=archive_result.file_resource.terminal_dt,
+ terminal_status_code=archive_result.file_resource.terminal_status_code,
+ terminal_sha1hex=archive_result.file_file_meta["sha1hex"],
+ ),
+ )
+ if archive_result.file_resource.cdx:
+ file_result["cdx"] = cdx_to_dict(archive_result.file_resource.cdx)
+ if archive_result.file_resource.revisit_cdx:
+ file_result["revisit_cdx"] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx
+ )
+ file_result["request"]["ingest_type"] = request["ingest_type"] + "-file"
+ # call the super() (ingest_file) version of process_hit()
+ info = self.process_file_hit(
+ file_result["request"]["ingest_type"],
+ archive_result.file_resource,
+ archive_result.file_file_meta,
+ )
+ file_result.update(info)
+ if self.ingest_file_result_sink:
+ self.ingest_file_result_sink.push_record(result.copy())
+ elif self.ingest_file_result_stdout:
+ sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n")
+
+ if result["status"].startswith("success"):
+ result["hit"] = True
+ print(
+ "[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format(
+ ingest_type,
+ result["file_count"],
+ result["total_size"],
+ ingest_strategy,
+ ),
+ file=sys.stderr,
+ )
+ else:
+ print(
+ "[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format(
+ ingest_type,
+ result["status"],
+ result["file_count"],
+ result["total_size"],
+ ingest_strategy,
+ ),
+ file=sys.stderr,
+ )
+ return result