path: root/python/sandcrawler/fileset_strategies.py
diff options
Diffstat (limited to 'python/sandcrawler/fileset_strategies.py')
1 files changed, 387 insertions, 0 deletions
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
new file mode 100644
index 0000000..1d84ce5
--- /dev/null
+++ b/python/sandcrawler/fileset_strategies.py
@@ -0,0 +1,387 @@
+import os
+import shutil
+import sys
+from typing import Optional
+import internetarchive
+import requests
+from sandcrawler.fileset_types import (
+ ArchiveStrategyResult,
+ FilesetPlatformItem,
+ IngestStrategy,
+ PlatformScopeError,
+from sandcrawler.ia import SavePageNowClient, WaybackClient, fix_transfer_encoding
+from sandcrawler.misc import (
+ gen_file_metadata,
+ gen_file_metadata_path,
+ requests_retry_session,
+ sanitize_fs_path,
+class FilesetIngestStrategy:
+ def __init__(self):
+ # self.ingest_strategy = 'unknown'
+ self.success_status = "success"
+ def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
+ raise NotImplementedError()
+ def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
+ raise NotImplementedError()
+class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.ingest_strategy = IngestStrategy.ArchiveorgFileset
+ # TODO: enable cleanup when confident (eg, safe path parsing)
+ self.skip_cleanup_local_files = kwargs.get("skip_cleanup_local_files", True)
+ self.working_dir = os.environ.get("SANDCRAWLER_WORKING_DIR", "/tmp/sandcrawler/")
+ try:
+ os.mkdir(self.working_dir)
+ except FileExistsError:
+ pass
+ self.http_session = requests_retry_session()
+ self.ia_session = internetarchive.get_session(
+ config={
+ "s3": {
+ "access": os.environ.get("IA_ACCESS_KEY"),
+ "secret": os.environ.get("IA_SECRET_KEY"),
+ },
+ }
+ )
+ def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
+ """
+ use API to check for item with all the files in the manifest
+ NOTE: this naive comparison is quadratic in number of files, aka O(N^2)
+ """
+ ia_item = self.ia_session.get_item(item.archiveorg_item_name)
+ if not ia_item.exists:
+ return None
+ item_files = ia_item.get_files(on_the_fly=False)
+ assert item.manifest
+ for wanted in item.manifest:
+ found = False
+ for existing in item_files:
+ if existing.name == wanted.path:
+ if (
+ (
+ (existing.sha1 and existing.sha1 == wanted.sha1)
+ or (existing.md5 and existing.md5 == wanted.md5)
+ )
+ and existing.name == wanted.path
+ and existing.size == wanted.size
+ ):
+ found = True
+ wanted.status = "exists"
+ break
+ else:
+ wanted.status = "mismatch-existing"
+ break
+ if not found:
+ print(
+ f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}",
+ file=sys.stderr,
+ )
+ return None
+ return ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ status="success-existing",
+ manifest=item.manifest,
+ )
+ def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
+ """
+ May require extra context to pass along to archive.org item creation.
+ """
+ existing = self.check_existing(item)
+ if existing:
+ return existing
+ if item.platform_name == "archiveorg":
+ raise PlatformScopeError("shouldn't download archive.org into itself")
+ local_dir = self.working_dir + item.archiveorg_item_name
+ assert local_dir.startswith("/")
+ assert local_dir.count("/") > 2
+ try:
+ os.mkdir(local_dir)
+ except FileExistsError:
+ pass
+ # 1. download all files locally
+ assert item.manifest
+ for m in item.manifest:
+ if m.path != sanitize_fs_path(m.path):
+ m.status = "unsafe-path"
+ continue
+ local_path = local_dir + "/" + m.path
+ assert m.platform_url
+ if not os.path.exists(os.path.dirname(local_path)):
+ os.mkdir(os.path.dirname(local_path))
+ if os.path.exists(local_path):
+ m.status = "exists-local"
+ else:
+ print(f" downloading {m.path}", file=sys.stderr)
+ # create any sub-directories for this path, if necessary
+ if not os.path.exists(os.path.dirname(local_path)):
+ os.mkdir(os.path.dirname(local_path))
+ try:
+ with self.http_session.get(
+ m.platform_url,
+ stream=True,
+ allow_redirects=True,
+ timeout=2 * 60 * 60,
+ ) as r:
+ r.raise_for_status()
+ with open(local_path + ".partial", "wb") as f:
+ for chunk in r.iter_content(chunk_size=256 * 1024):
+ f.write(chunk)
+ os.rename(local_path + ".partial", local_path)
+ m.status = "downloaded-local"
+ except requests.exceptions.RequestException:
+ m.status = "error-platform-download"
+ return ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ manifest=item.manifest,
+ status="error-platform-download",
+ )
+ print(f" verifying {m.path}", file=sys.stderr)
+ file_meta = gen_file_metadata_path(local_path, allow_empty=True)
+ if file_meta["size_bytes"] != m.size:
+ print(f" expected: {m.size} found: {file_meta['size_bytes']}", file=sys.stderr)
+ m.status = "mismatch-size"
+ continue
+ if m.sha1:
+ if file_meta["sha1hex"] != m.sha1:
+ m.status = "mismatch-sha1"
+ continue
+ else:
+ m.sha1 = file_meta["sha1hex"]
+ if m.sha256:
+ if file_meta["sha256hex"] != m.sha256:
+ m.status = "mismatch-sha256"
+ continue
+ else:
+ m.sha256 = file_meta["sha256hex"]
+ if m.md5:
+ if file_meta["md5hex"] != m.md5:
+ m.status = "mismatch-md5"
+ continue
+ else:
+ m.md5 = file_meta["md5hex"]
+ if m.mimetype:
+ # 'magic' isn't good and parsing more detailed text file formats like text/csv
+ if (
+ file_meta["mimetype"] != m.mimetype
+ and file_meta["mimetype"] != "text/plain"
+ ):
+ # these 'tab-separated-values' from dataverse are just noise, don't log them
+ if m.mimetype != "text/tab-separated-values":
+ print(
+ f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}",
+ file=sys.stderr,
+ )
+ m.mimetype = file_meta["mimetype"]
+ else:
+ m.mimetype = file_meta["mimetype"]
+ m.status = "verified-local"
+ # if verification failed for any individual files, bail out
+ for m in item.manifest:
+ if m.status != "verified-local":
+ return ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ manifest=item.manifest,
+ status=m.status,
+ )
+ # 2. upload all files, with metadata
+ assert item.archiveorg_item_meta and item.archiveorg_item_meta["collection"]
+ item_files = {}
+ for m in item.manifest:
+ local_path = local_dir + "/" + m.path
+ if m.path == "name":
+ raise NotImplementedError(
+ "fileset file path is 'name', which is a reserved keyword"
+ )
+ item_files[m.path] = local_path
+ if len(item_files) != len(item.manifest):
+ raise NotImplementedError("file/manifest length mismatch: duplicated file paths?")
+ print(
+ f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...",
+ file=sys.stderr,
+ )
+ try:
+ internetarchive.upload(
+ item.archiveorg_item_name,
+ files=item_files,
+ metadata=item.archiveorg_item_meta,
+ checksum=True,
+ queue_derive=False,
+ verify=True,
+ )
+ except requests.exceptions.RequestException:
+ return ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ manifest=item.manifest,
+ status="error-archiveorg-upload",
+ )
+ for m in item.manifest:
+ m.status = "success"
+ # 4. delete local directory
+ if not self.skip_cleanup_local_files:
+ shutil.rmtree(local_dir)
+ result = ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ status=self.success_status,
+ manifest=item.manifest,
+ )
+ return result
+class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy):
+ """
+ ArchiveorgFilesetStrategy currently works fine with individual files. Just
+ need to over-ride the ingest_strategy name.
+ """
+ def __init__(self):
+ super().__init__()
+ self.ingest_strategy = IngestStrategy.ArchiveorgFileset
+ self.success_status = "success-file"
+class WebFilesetStrategy(FilesetIngestStrategy):
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.ingest_strategy = IngestStrategy.WebFileset
+ self.wayback_client = WaybackClient()
+ self.try_spn2 = kwargs.get("try_spn2", True)
+ self.spn_client = SavePageNowClient(
+ spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0)
+ )
+ self.max_spn_manifest = 20
+ def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
+ """
+ For each manifest item individually, run 'fetch_resource' and record stats, terminal_url, terminal_dt
+ - full fetch_resource() method which can do SPN requests
+ """
+ assert item.manifest
+ file_file_meta = None
+ file_resource = None
+ for m in item.manifest:
+ fetch_url = m.platform_url
+ if not fetch_url:
+ raise NotImplementedError(
+ "require 'platform_url' for each file when doing Web fetching"
+ )
+ via = "wayback"
+ resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype)
+ if self.try_spn2 and (
+ resource is None or (resource and resource.status == "no-capture")
+ ):
+ if len(item.manifest) > self.max_spn_manifest:
+ m.status = "too-much-spn"
+ continue
+ via = "spn2"
+ resource = self.spn_client.crawl_resource(
+ fetch_url, self.wayback_client, force_simple_get=True
+ )
+ print(
+ "[FETCH {:>6}] {} {}".format(
+ via,
+ (resource and resource.status),
+ (resource and resource.terminal_url) or fetch_url,
+ ),
+ file=sys.stderr,
+ )
+ m.terminal_url = resource.terminal_url
+ m.terminal_dt = resource.terminal_dt
+ m.status = resource.status
+ if self.ingest_strategy == "web-file":
+ file_resource = resource
+ if resource.status != "success":
+ continue
+ else:
+ assert resource.terminal_status_code == 200
+ if not resource.body:
+ m.status = "empty-blob"
+ continue
+ file_meta = gen_file_metadata(resource.body)
+ try:
+ file_meta, _html_resource = fix_transfer_encoding(file_meta, resource)
+ except Exception:
+ m.status = "transfer-encoding-error"
+ continue
+ if self.ingest_strategy == "web-file":
+ file_file_meta = file_meta
+ if (
+ file_meta["size_bytes"] != m.size
+ or (m.md5 and m.md5 != file_meta["md5hex"])
+ or (m.sha1 and m.sha1 != file_meta["sha1hex"])
+ ):
+ m.status = "mismatch"
+ continue
+ m.md5 = m.md5 or file_meta["md5hex"]
+ m.sha1 = m.sha1 or file_meta["sha1hex"]
+ m.sha256 = m.sha256 or file_meta["sha256hex"]
+ m.mimetype = m.mimetype or file_meta["mimetype"]
+ overall_status = self.success_status
+ for m in item.manifest:
+ if m.status != "success":
+ overall_status = m.status or "not-processed"
+ break
+ if not item.manifest:
+ overall_status = "empty-manifest"
+ result = ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ status=overall_status,
+ manifest=item.manifest,
+ )
+ if self.ingest_strategy == "web-file":
+ result.file_file_meta = file_file_meta
+ result.file_resource = file_resource
+ return result
+class WebFileStrategy(WebFilesetStrategy):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.ingest_strategy = IngestStrategy.WebFile
+ self.success_status = "success-file"