diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-10-06 15:13:46 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-10-15 18:15:29 -0700 |
commit | 07e8a199766be77f4e89561d03e9b4e995ab7396 (patch) | |
tree | f4882b3fd32e0ed46e2359900a01e1413287d53e /python | |
parent | 206969ccebb5007b6c687edd6e09b5c4910e0152 (diff) | |
download | sandcrawler-07e8a199766be77f4e89561d03e9b4e995ab7396.tar.gz sandcrawler-07e8a199766be77f4e89561d03e9b4e995ab7396.zip |
fileset ingest progress for dataverse
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/fileset_platforms.py | 153 | ||||
-rw-r--r-- | python/sandcrawler/fileset_strategies.py | 145 | ||||
-rw-r--r-- | python/sandcrawler/fileset_types.py | 2 | ||||
-rw-r--r-- | python/sandcrawler/ingest_fileset.py | 14 |
4 files changed, 291 insertions, 23 deletions
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 5342a4e..58094c2 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -3,9 +3,11 @@ import sys import json import gzip import time +import urllib.parse from collections import namedtuple from typing import Optional, Tuple, Any, Dict, List +import requests import internetarchive from sandcrawler.html_metadata import BiblioMetadata @@ -30,20 +32,159 @@ class DatasetPlatformHelper(): """ raise NotImplementedError() - def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy: - raise NotImplementedError() + def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: + assert item.manifest + total_size = sum([m.size for m in item.manifest]) + largest_size = max([m.size for m in item.manifest]) + print(f" total_size={total_size} largest_size={largest_size}", file=sys.stderr) + # XXX: while developing ArchiveorgFileset path + return IngestStrategy.ArchiveorgFileset + if len(item.manifest) == 1: + if total_size < 128*1024*1024: + return IngestStrategy.WebFile + else: + return IngestStrategy.ArchiveorgFile + else: + if largest_size < 128*1024*1024 and total_size < 1*1024*1024*1024: + return IngestStrategy.WebFileset + else: + return IngestStrategy.ArchiveorgFileset class DataverseHelper(DatasetPlatformHelper): def __init__(self): self.platform_name = 'dataverse' + self.session = requests.Session() + self.dataverse_domain_allowlist = [ + 'dataverse.harvard.edu', + 'data.lipi.go.id', + ] def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + """ + XXX: should match process_request() logic better + """ + + components = urllib.parse.urlparse(request['base_url']) + platform_domain = components.netloc.split(':')[0].lower() + params = urllib.parse.parse_qs(components.query) + platform_id = params.get('persistentId') + + if not platform_domain in self.dataverse_domain_allowlist: + return False + if not platform_id: + return False + + if html_biblio and 'dataverse' in html_biblio.publisher.lower(): + return True return False - def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy: - raise NotImplementedError() + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + """ + Fetch platform-specific metadata for this request (eg, via API calls) + + + HTTP GET https://demo.dataverse.org/api/datasets/export?exporter=dataverse_json&persistentId=doi:10.5072/FK2/J8SJZB + + """ + # 1. extract domain, PID, and version from URL + components = urllib.parse.urlparse(request['base_url']) + platform_domain = components.netloc.split(':')[0].lower() + params = urllib.parse.parse_qs(components.query) + dataset_version = params.get('version') + platform_id = params.get('persistentId') + if not (platform_id and platform_id[0]): + raise ValueError("Expected a Dataverse persistentId in URL") + else: + platform_id = platform_id[0] + + if not platform_domain in self.dataverse_domain_allowlist: + raise ValueError(f"unexpected dataverse domain: {platform_domain}") + + # for both handle (hdl:) and DOI (doi:) identifiers, the norm is for + # dataverse persistetId is to be structured like: + # <prefix> / <shoulder> / <dataset-id> / <file-id> + if not (platform_id.startswith('doi:10.') or platform_id.startswith('hdl:')): + raise NotImplementedError(f"unsupported dataverse persistentId format: {platform_id}") + dataverse_type = None + if platform_id.count('/') == 2: + dataverse_type = 'dataset' + elif platform_id.count('/') == 3: + dataverse_type = 'file' + else: + raise NotImplementedError(f"unsupported dataverse persistentId format: {platform_id}") + + if dataverse_type != 'dataset': + # XXX + raise NotImplementedError(f"only entire dataverse datasets can be archived with this tool") + + # 1b. if we didn't get a version number from URL, fetch it from API + if not dataset_version: + obj = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}").json() + obj_latest = obj['data']['latestVersion'] + dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" + + # 2. API fetch + obj = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}").json() + + obj_latest= obj['data']['latestVersion'] + assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" + assert platform_id == obj_latest['datasetPersistentId'] + + manifest = [] + for row in obj_latest['files']: + df = row['dataFile'] + df_persistent_id = df['persistentId'] + platform_url = f"https://{platform_domain}/api/access/datafile/:persistentId/?persistentId={df_persistent_id}" + if df.get('originalFileName'): + platform_url += '&format=original' + manifest.append(FilesetManifestFile( + path=df.get('originalFileName') or df['filename'], + size=df.get('originalFileSize') or df['filesize'], + md5=df['md5'], + # NOTE: don't get: sha1, sha256 + mimetype=df['contentType'], + platform_url=platform_url, + extra=dict( + # file-level + description=df.get('description'), + version=df.get('version'), + ), + )) + + platform_sub_id = platform_id.split('/')[-1] + archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}" + archiveorg_item_meta = dict( + # XXX: collection=platform_domain, + collection="datasets", + date=obj_latest['releaseTime'].split('T')[0], + source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", + ) + if platform_id.startswith('doi:10.'): + archiveorg_item_meta['doi'] = platform_id.replace('doi:', '') + for block in obj_latest['metadataBlocks']['citation']['fields']: + if block['typeName'] == 'title': + archiveorg_item_meta['title'] = block['value'] + elif block['typeName'] == 'depositor': + archiveorg_item_meta['creator'] = block['value'] + elif block['typeName'] == 'dsDescription': + archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue']['value'] + + archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '') + '\n<br>\n' + obj_latest['termsOfUse'] + + return DatasetPlatformItem( + platform_name=self.platform_name, + platform_status='success', + manifest=manifest, + platform_domain=platform_domain, + platform_id=platform_id, + archiveorg_item_name=archiveorg_item_name, + archiveorg_item_meta=archiveorg_item_meta, + web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", + # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) + extra=dict(version=dataset_version), + ) class ArchiveOrgHelper(DatasetPlatformHelper): @@ -174,7 +315,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper): platform_domain='archive.org', platform_id=item_name, archiveorg_item_name=item_name, - archiveorg_collection=item_collection, + archiveorg_meta=dict(collection=item_collection), ) def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: @@ -185,7 +326,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper): elif len(item.manifest) >= 1: return IngestStrategy.ArchiveorgFileset else: - raise NotImplementedError() + raise NotImplementedError("empty dataset") DATASET_PLATFORM_HELPER_TABLE = { diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index 26bc5ad..c335ea6 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -1,8 +1,10 @@ +import os import sys import json import gzip import time +import shutil from collections import namedtuple from typing import Optional, Tuple, Any, Dict, List @@ -11,6 +13,7 @@ import internetarchive from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult +from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path class FilesetIngestStrategy(): @@ -28,27 +31,42 @@ class FilesetIngestStrategy(): class ArchiveorgFilesetStrategy(FilesetIngestStrategy): - def __init__(self): + def __init__(self, **kwargs): self.ingest_strategy = IngestStrategy.ArchiveorgFileset - self.session = internetarchive.get_session() + + # XXX: enable cleanup when confident (eg, safe path parsing) + self.skip_cleanup_local_files = kwargs.get('skip_cleanup_local_files', True) + self.working_dir = os.environ.get('SANDCRAWLER_WORKING_DIR', '/tmp/sandcrawler/') + try: + os.mkdir(self.working_dir) + except FileExistsError: + pass + + self.ia_session = internetarchive.get_session() def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: """ use API to check for item with all the files in the manifest - TODO: this naive comparison is quadratic in number of files, aka O(N^2) - XXX: should this verify sha256 and/or mimetype? + NOTE: this naive comparison is quadratic in number of files, aka O(N^2) """ - ia_item = self.session.get_item(item.archiveorg_item_name) + ia_item = self.ia_session.get_item(item.archiveorg_item_name) + if not ia_item.exists: + return False item_files = ia_item.get_files(on_the_fly=False) for wanted in item.manifest: found = False for existing in item_files: - if existing.sha1 == wanted.sha1 and existing.name == wanted.path and existing.size == wanted.size: - found = True - break + if existing.name == wanted.path: + if ((existing.sha1 and existing.sha1 == wanted.sha1) or (existing.md5 and existing.md5 == wanted.md5)) and existing.name == wanted.path and existing.size == wanted.size: + found = True + wanted.status = 'exists' + break + else: + wanted.status = 'mismatch-existing' + break if not found: - print(f" didn't find at least one file: {wanted}", file=sys.stderr) + print(f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", file=sys.stderr) return None return ArchiveStrategyResult( ingest_strategy=self.ingest_strategy, @@ -57,11 +75,118 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): ) def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + """ + May require extra context to pass along to archive.org item creation. + """ existing = self.check_existing(item) if existing: return existing - raise NotImplementedError() + + local_dir = self.working_dir + item.archiveorg_item_name + assert local_dir.startswith('/') + assert local_dir.count('/') > 2 + try: + os.mkdir(local_dir) + except FileExistsError: + pass + + # 1. download all files locally + for m in item.manifest: + # XXX: enforce safe/sane filename + + local_path = local_dir + '/' + m.path + assert m.platform_url + + if not os.path.exists(local_path): + print(f" downloading {m.path}", file=sys.stderr) + with self.ia_session.get(m.platform_url, stream=True, allow_redirects=True) as r: + r.raise_for_status() + with open(local_path + '.partial', 'wb') as f: + for chunk in r.iter_content(chunk_size=256*1024): + f.write(chunk) + os.rename(local_path + '.partial', local_path) + m.status = 'downloaded-local' + else: + m.status = 'exists-local' + + print(f" verifying {m.path}", file=sys.stderr) + file_meta = gen_file_metadata_path(local_path, allow_empty=True) + assert file_meta['size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}" + + if m.sha1: + assert file_meta['sha1hex'] == m.sha1 + else: + m.sha1 = file_meta['sha1hex'] + + if m.sha256: + assert file_meta['sha256hex'] == m.sha256 + else: + m.sha256 = file_meta['sha256hex'] + + if m.md5: + assert file_meta['md5hex'] == m.md5 + else: + m.md5 = file_meta['md5hex'] + + if m.mimetype: + # 'magic' isn't good and parsing more detailed text file formats like text/csv + if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain': + # these 'tab-separated-values' from dataverse are just noise, don't log them + if m.mimetype != 'text/tab-separated-values': + print(f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", file=sys.stderr) + m.mimetype = file_meta['mimetype'] + else: + m.mimetype = file_meta['mimetype'] + m.status = 'verified-local' + + # 2. setup archive.org item metadata + assert item.archiveorg_item_meta['collection'] + # 3. upload all files + item_files = [] + for m in item.manifest: + local_path = local_dir + '/' + m.path + item_files.append({ + 'name': local_path, + 'remote_name': m.path, + }) + + print(f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr) + internetarchive.upload( + item.archiveorg_item_name, + files=item_files, + metadata=item.archiveorg_item_meta, + checksum=True, + queue_derive=False, + verify=True, + ) + + for m in item.manifest: + m.status = 'success' + + # 4. delete local directory + if not self.skip_cleanup_local_files: + shutil.rmtree(local_dir) + + result = ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + status='success', + manifest=item.manifest, + ) + + return result + +class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): + """ + ArchiveorgFilesetStrategy currently works fine with individual files. Just + need to over-ride the ingest_strategy name. + """ + + def __init__(self): + super().__init__() + self.ingest_strategy = IngestStrategy.ArchiveorgFileset + FILESET_STRATEGY_HELPER_TABLE = { IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(), + IngestStrategy.ArchiveorgFile: ArchiveorgFileStrategy(), } diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index 75f4992..51000d7 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -34,7 +34,7 @@ class DatasetPlatformItem(BaseModel): platform_domain: Optional[str] platform_id: Optional[str] archiveorg_item_name: Optional[str] - archiveorg_collection: Optional[str] + archiveorg_item_meta: Optional[dict] web_base_url: Optional[str] web_bundle_url: Optional[str] diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 50e3c05..3b55793 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -262,19 +262,21 @@ class IngestFilesetWorker(IngestFileWorker): if resource: terminal_url = resource.terminal_url dataset_meta = platform_helper.process_request(request, terminal_url, html_biblio) - print(dataset_meta, file=sys.stderr) + #print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name result['platform'] = dataset_meta.platform_name result['platform_id'] = dataset_meta.platform_id result['item_name'] = dataset_meta.archiveorg_item_name - if not dataset_meta.manifest: + result['item_meta'] = dataset_meta.archiveorg_item_meta + + if dataset_meta.manifest: + result['manifest'] = [m.dict() for m in dataset_meta.manifest] + result['file_count'] = len(dataset_meta.manifest) + result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) + else: result['status'] = 'no-manifest' return result - result['manifest'] = dataset_meta.manifest or None - result['file_count'] = len(dataset_meta.manifest) or None - result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) or None - ingest_strategy = platform_helper.chose_strategy(dataset_meta) result['ingest_strategy'] = ingest_strategy |