diff options
Diffstat (limited to 'python/sandcrawler/fileset_strategies.py')
-rw-r--r-- | python/sandcrawler/fileset_strategies.py | 387 |
1 files changed, 387 insertions, 0 deletions
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py new file mode 100644 index 0000000..1d84ce5 --- /dev/null +++ b/python/sandcrawler/fileset_strategies.py @@ -0,0 +1,387 @@ +import os +import shutil +import sys +from typing import Optional + +import internetarchive +import requests + +from sandcrawler.fileset_types import ( + ArchiveStrategyResult, + FilesetPlatformItem, + IngestStrategy, + PlatformScopeError, +) +from sandcrawler.ia import SavePageNowClient, WaybackClient, fix_transfer_encoding +from sandcrawler.misc import ( + gen_file_metadata, + gen_file_metadata_path, + requests_retry_session, + sanitize_fs_path, +) + + +class FilesetIngestStrategy: + def __init__(self): + # self.ingest_strategy = 'unknown' + self.success_status = "success" + + def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: + raise NotImplementedError() + + def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: + raise NotImplementedError() + + +class ArchiveorgFilesetStrategy(FilesetIngestStrategy): + def __init__(self, **kwargs): + super().__init__() + self.ingest_strategy = IngestStrategy.ArchiveorgFileset + + # TODO: enable cleanup when confident (eg, safe path parsing) + self.skip_cleanup_local_files = kwargs.get("skip_cleanup_local_files", True) + self.working_dir = os.environ.get("SANDCRAWLER_WORKING_DIR", "/tmp/sandcrawler/") + try: + os.mkdir(self.working_dir) + except FileExistsError: + pass + + self.http_session = requests_retry_session() + self.ia_session = internetarchive.get_session( + config={ + "s3": { + "access": os.environ.get("IA_ACCESS_KEY"), + "secret": os.environ.get("IA_SECRET_KEY"), + }, + } + ) + + def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: + """ + use API to check for item with all the files in the manifest + + NOTE: this naive comparison is quadratic in number of files, aka O(N^2) + """ + ia_item = self.ia_session.get_item(item.archiveorg_item_name) + if not ia_item.exists: + return None + item_files = ia_item.get_files(on_the_fly=False) + assert item.manifest + for wanted in item.manifest: + found = False + for existing in item_files: + if existing.name == wanted.path: + if ( + ( + (existing.sha1 and existing.sha1 == wanted.sha1) + or (existing.md5 and existing.md5 == wanted.md5) + ) + and existing.name == wanted.path + and existing.size == wanted.size + ): + found = True + wanted.status = "exists" + break + else: + wanted.status = "mismatch-existing" + break + if not found: + print( + f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", + file=sys.stderr, + ) + return None + return ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + status="success-existing", + manifest=item.manifest, + ) + + def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: + """ + May require extra context to pass along to archive.org item creation. + """ + existing = self.check_existing(item) + if existing: + return existing + + if item.platform_name == "archiveorg": + raise PlatformScopeError("shouldn't download archive.org into itself") + + local_dir = self.working_dir + item.archiveorg_item_name + assert local_dir.startswith("/") + assert local_dir.count("/") > 2 + try: + os.mkdir(local_dir) + except FileExistsError: + pass + + # 1. download all files locally + assert item.manifest + for m in item.manifest: + if m.path != sanitize_fs_path(m.path): + m.status = "unsafe-path" + continue + + local_path = local_dir + "/" + m.path + assert m.platform_url + + if not os.path.exists(os.path.dirname(local_path)): + os.mkdir(os.path.dirname(local_path)) + if os.path.exists(local_path): + m.status = "exists-local" + else: + print(f" downloading {m.path}", file=sys.stderr) + # create any sub-directories for this path, if necessary + if not os.path.exists(os.path.dirname(local_path)): + os.mkdir(os.path.dirname(local_path)) + try: + with self.http_session.get( + m.platform_url, + stream=True, + allow_redirects=True, + timeout=2 * 60 * 60, + ) as r: + r.raise_for_status() + with open(local_path + ".partial", "wb") as f: + for chunk in r.iter_content(chunk_size=256 * 1024): + f.write(chunk) + os.rename(local_path + ".partial", local_path) + m.status = "downloaded-local" + except requests.exceptions.RequestException: + m.status = "error-platform-download" + return ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + manifest=item.manifest, + status="error-platform-download", + ) + + print(f" verifying {m.path}", file=sys.stderr) + file_meta = gen_file_metadata_path(local_path, allow_empty=True) + if file_meta["size_bytes"] != m.size: + print(f" expected: {m.size} found: {file_meta['size_bytes']}", file=sys.stderr) + m.status = "mismatch-size" + continue + + if m.sha1: + if file_meta["sha1hex"] != m.sha1: + m.status = "mismatch-sha1" + continue + else: + m.sha1 = file_meta["sha1hex"] + + if m.sha256: + if file_meta["sha256hex"] != m.sha256: + m.status = "mismatch-sha256" + continue + else: + m.sha256 = file_meta["sha256hex"] + + if m.md5: + if file_meta["md5hex"] != m.md5: + m.status = "mismatch-md5" + continue + else: + m.md5 = file_meta["md5hex"] + + if m.mimetype: + # 'magic' isn't good and parsing more detailed text file formats like text/csv + if ( + file_meta["mimetype"] != m.mimetype + and file_meta["mimetype"] != "text/plain" + ): + # these 'tab-separated-values' from dataverse are just noise, don't log them + if m.mimetype != "text/tab-separated-values": + print( + f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", + file=sys.stderr, + ) + m.mimetype = file_meta["mimetype"] + else: + m.mimetype = file_meta["mimetype"] + m.status = "verified-local" + + # if verification failed for any individual files, bail out + for m in item.manifest: + if m.status != "verified-local": + return ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + manifest=item.manifest, + status=m.status, + ) + + # 2. upload all files, with metadata + assert item.archiveorg_item_meta and item.archiveorg_item_meta["collection"] + item_files = {} + for m in item.manifest: + local_path = local_dir + "/" + m.path + if m.path == "name": + raise NotImplementedError( + "fileset file path is 'name', which is a reserved keyword" + ) + item_files[m.path] = local_path + if len(item_files) != len(item.manifest): + raise NotImplementedError("file/manifest length mismatch: duplicated file paths?") + + print( + f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", + file=sys.stderr, + ) + try: + internetarchive.upload( + item.archiveorg_item_name, + files=item_files, + metadata=item.archiveorg_item_meta, + checksum=True, + queue_derive=False, + verify=True, + ) + except requests.exceptions.RequestException: + return ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + manifest=item.manifest, + status="error-archiveorg-upload", + ) + + for m in item.manifest: + m.status = "success" + + # 4. delete local directory + if not self.skip_cleanup_local_files: + shutil.rmtree(local_dir) + + result = ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + status=self.success_status, + manifest=item.manifest, + ) + + return result + + +class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): + """ + ArchiveorgFilesetStrategy currently works fine with individual files. Just + need to over-ride the ingest_strategy name. + """ + + def __init__(self): + super().__init__() + self.ingest_strategy = IngestStrategy.ArchiveorgFileset + self.success_status = "success-file" + + +class WebFilesetStrategy(FilesetIngestStrategy): + def __init__(self, **kwargs): + super().__init__() + self.ingest_strategy = IngestStrategy.WebFileset + self.wayback_client = WaybackClient() + self.try_spn2 = kwargs.get("try_spn2", True) + self.spn_client = SavePageNowClient( + spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0) + ) + self.max_spn_manifest = 20 + + def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: + """ + For each manifest item individually, run 'fetch_resource' and record stats, terminal_url, terminal_dt + + TODO: + - full fetch_resource() method which can do SPN requests + """ + + assert item.manifest + file_file_meta = None + file_resource = None + for m in item.manifest: + fetch_url = m.platform_url + if not fetch_url: + raise NotImplementedError( + "require 'platform_url' for each file when doing Web fetching" + ) + + via = "wayback" + resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype) + + if self.try_spn2 and ( + resource is None or (resource and resource.status == "no-capture") + ): + if len(item.manifest) > self.max_spn_manifest: + m.status = "too-much-spn" + continue + via = "spn2" + resource = self.spn_client.crawl_resource( + fetch_url, self.wayback_client, force_simple_get=True + ) + + print( + "[FETCH {:>6}] {} {}".format( + via, + (resource and resource.status), + (resource and resource.terminal_url) or fetch_url, + ), + file=sys.stderr, + ) + + m.terminal_url = resource.terminal_url + m.terminal_dt = resource.terminal_dt + m.status = resource.status + if self.ingest_strategy == "web-file": + file_resource = resource + + if resource.status != "success": + continue + else: + assert resource.terminal_status_code == 200 + + if not resource.body: + m.status = "empty-blob" + continue + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, _html_resource = fix_transfer_encoding(file_meta, resource) + except Exception: + m.status = "transfer-encoding-error" + continue + + if self.ingest_strategy == "web-file": + file_file_meta = file_meta + + if ( + file_meta["size_bytes"] != m.size + or (m.md5 and m.md5 != file_meta["md5hex"]) + or (m.sha1 and m.sha1 != file_meta["sha1hex"]) + ): + m.status = "mismatch" + continue + + m.md5 = m.md5 or file_meta["md5hex"] + m.sha1 = m.sha1 or file_meta["sha1hex"] + m.sha256 = m.sha256 or file_meta["sha256hex"] + m.mimetype = m.mimetype or file_meta["mimetype"] + + overall_status = self.success_status + for m in item.manifest: + if m.status != "success": + overall_status = m.status or "not-processed" + break + if not item.manifest: + overall_status = "empty-manifest" + + result = ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + status=overall_status, + manifest=item.manifest, + ) + if self.ingest_strategy == "web-file": + result.file_file_meta = file_file_meta + result.file_resource = file_resource + return result + + +class WebFileStrategy(WebFilesetStrategy): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.ingest_strategy = IngestStrategy.WebFile + self.success_status = "success-file" |