diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/fileset_platforms.py | 69 | ||||
-rw-r--r-- | python/sandcrawler/fileset_strategies.py | 23 | ||||
-rw-r--r-- | python/sandcrawler/fileset_types.py | 15 | ||||
-rw-r--r-- | python/sandcrawler/ingest_fileset.py | 123 |
4 files changed, 148 insertions, 82 deletions
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 5f2f743..bcf2144 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -15,7 +15,7 @@ from sandcrawler.ia import ResourceResult from sandcrawler.fileset_types import * -class DatasetPlatformHelper(): +class FilesetPlatformHelper(): def __init__(self): self.platform_name = 'unknown' @@ -26,16 +26,16 @@ class DatasetPlatformHelper(): """ raise NotImplementedError() - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ raise NotImplementedError() - def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: + def chose_strategy(self, item: FilesetPlatformItem) -> IngestStrategy: assert item.manifest - total_size = sum([m.size for m in item.manifest]) - largest_size = max([m.size for m in item.manifest]) + total_size = sum([m.size for m in item.manifest]) or 0 + largest_size = max([m.size or 0 for m in item.manifest]) or 0 #print(f" total_size={total_size} largest_size={largest_size}", file=sys.stderr) # XXX: while developing ArchiveorgFileset path #return IngestStrategy.ArchiveorgFileset @@ -51,7 +51,7 @@ class DatasetPlatformHelper(): return IngestStrategy.ArchiveorgFileset -class DataverseHelper(DatasetPlatformHelper): +class DataverseHelper(FilesetPlatformHelper): def __init__(self): self.platform_name = 'dataverse' @@ -133,10 +133,10 @@ class DataverseHelper(DatasetPlatformHelper): components = urllib.parse.urlparse(url) platform_domain = components.netloc.split(':')[0].lower() params = urllib.parse.parse_qs(components.query) - platform_id = params.get('persistentId') - if not platform_id: + id_param = params.get('persistentId') + if not id_param: return False - platform_id = platform_id[0] + platform_id = id_param[0] try: parsed = self.parse_dataverse_persistentid(platform_id) @@ -145,7 +145,7 @@ class DataverseHelper(DatasetPlatformHelper): return True - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) @@ -162,14 +162,14 @@ class DataverseHelper(DatasetPlatformHelper): components = urllib.parse.urlparse(url) platform_domain = components.netloc.split(':')[0].lower() params = urllib.parse.parse_qs(components.query) - dataset_version = params.get('version') - platform_id = params.get('persistentId') - if not (platform_id and platform_id[0]): + id_param = params.get('persistentId') + if not (id_param and id_param[0]): raise PlatformScopeError("Expected a Dataverse persistentId in URL") - else: - platform_id = platform_id[0] - if type(dataset_version) == list: - dataset_version = dataset_version[0] + platform_id = id_param[0] + version_param = params.get('version') + dataset_version = None + if version_param: + dataset_version = version_param[0] try: parsed_id = self.parse_dataverse_persistentid(platform_id) @@ -243,7 +243,7 @@ class DataverseHelper(DatasetPlatformHelper): if obj_latest.get('termsOfUse'): archiveorg_item_meta['description'] += '\n<br>\n' + obj_latest['termsOfUse'] - return DatasetPlatformItem( + return FilesetPlatformItem( platform_name=self.platform_name, platform_status='success', manifest=manifest, @@ -321,18 +321,18 @@ def test_parse_dataverse_persistentid(): except ValueError: pass -class FigshareHelper(DatasetPlatformHelper): +class FigshareHelper(FilesetPlatformHelper): def __init__(self): self.platform_name = 'figshare' self.session = requests.Session() @staticmethod - def parse_figshare_url_path(path: str) -> List[str]: + def parse_figshare_url_path(path: str) -> Tuple[str, Optional[str]]: """ Tries to parse a figshare URL into ID number and (optional) version number. - Returns a two-element list; version number will be None if not found + Returns a two-element tuple; version number will be None if not found Raises a ValueError if not a figshare URL """ @@ -340,14 +340,14 @@ class FigshareHelper(DatasetPlatformHelper): comp = path.split('/') if len(comp) < 4 or comp[1] != 'articles': - raise ValueError + raise ValueError(f"not a figshare URL: {path}") if len(comp) == 5 and comp[3].isdigit() and comp[4].isdigit(): return (comp[3], comp[4]) elif len(comp) == 4 and comp[3].isdigit(): return (comp[3], None) else: - raise ValueError + raise ValueError(f"couldn't find figshare identiier: {path}") def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: @@ -374,7 +374,7 @@ class FigshareHelper(DatasetPlatformHelper): return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -390,7 +390,7 @@ class FigshareHelper(DatasetPlatformHelper): (platform_id, dataset_version) = self.parse_figshare_url_path(components.path) assert platform_id.isdigit(), f"expected numeric: {platform_id}" - assert dataset_version.isdigit(), f"expected numeric: {dataset_version}" + assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}" # 1b. if we didn't get a version number from URL, fetch it from API # TODO: implement this code path @@ -436,7 +436,7 @@ class FigshareHelper(DatasetPlatformHelper): version=obj['version'], ) - return DatasetPlatformItem( + return FilesetPlatformItem( platform_name=self.platform_name, platform_status='success', manifest=manifest, @@ -471,7 +471,7 @@ def test_parse_figshare_url_path(): except ValueError: pass -class ZenodoHelper(DatasetPlatformHelper): +class ZenodoHelper(FilesetPlatformHelper): def __init__(self): self.platform_name = 'zenodo' @@ -490,7 +490,7 @@ class ZenodoHelper(DatasetPlatformHelper): return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -567,7 +567,7 @@ class ZenodoHelper(DatasetPlatformHelper): # obj['metadata']['version'] is, eg, git version tag ) - return DatasetPlatformItem( + return FilesetPlatformItem( platform_name=self.platform_name, platform_status='success', manifest=manifest, @@ -581,7 +581,7 @@ class ZenodoHelper(DatasetPlatformHelper): ) -class ArchiveOrgHelper(DatasetPlatformHelper): +class ArchiveOrgHelper(FilesetPlatformHelper): FORMAT_TO_MIMETYPE = { 'BZIP': 'application/x-bzip', @@ -623,7 +623,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper): self.session = internetarchive.get_session() @staticmethod - def want_item_file(f: dict, item_name: str) -> bool: + def want_item_file(f: internetarchive.File, item_name: str) -> bool: """ Filters IA API files """ @@ -662,7 +662,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper): return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -700,7 +700,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper): ) manifest.append(mf) - return DatasetPlatformItem( + return FilesetPlatformItem( platform_name=self.platform_name, platform_status='success', manifest=manifest, @@ -710,10 +710,11 @@ class ArchiveOrgHelper(DatasetPlatformHelper): archiveorg_meta=dict(collection=item_collection), ) - def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: + def chose_strategy(self, item: FilesetPlatformItem) -> IngestStrategy: """ Don't use default strategy picker; we are always doing an 'existing' in this case. """ + assert item.manifest is not None if len(item.manifest) == 1: # NOTE: code flow does not support ArchiveorgFilesetBundle for the # case of, eg, a single zipfile in an archive.org item diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index 43f1a53..2577d2b 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -12,7 +12,7 @@ import internetarchive from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult, WaybackClient, SavePageNowClient, fix_transfer_encoding -from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult, PlatformScopeError +from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, FilesetPlatformItem, ArchiveStrategyResult, PlatformScopeError from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path @@ -22,10 +22,10 @@ class FilesetIngestStrategy(): #self.ingest_strategy = 'unknown' pass - def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: + def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: raise NotImplementedError() - def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: raise NotImplementedError() @@ -44,7 +44,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): self.ia_session = internetarchive.get_session() - def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: + def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: """ use API to check for item with all the files in the manifest @@ -52,8 +52,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): """ ia_item = self.ia_session.get_item(item.archiveorg_item_name) if not ia_item.exists: - return False + return None item_files = ia_item.get_files(on_the_fly=False) + assert item.manifest for wanted in item.manifest: found = False for existing in item_files: @@ -74,7 +75,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): manifest=item.manifest, ) - def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: """ May require extra context to pass along to archive.org item creation. """ @@ -94,6 +95,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): pass # 1. download all files locally + assert item.manifest for m in item.manifest: # XXX: enforce safe/sane filename @@ -143,7 +145,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): m.status = 'verified-local' # 2. upload all files, with metadata - assert item.archiveorg_item_meta['collection'] + assert item.archiveorg_item_meta and item.archiveorg_item_meta['collection'] item_files = [] for m in item.manifest: local_path = local_dir + '/' + m.path @@ -212,7 +214,7 @@ class WebFilesetStrategy(FilesetIngestStrategy): "://s3-eu-west-1.amazonaws.com/", ] - def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: """ For each manifest item individually, run 'fetch_resource' and record stats, terminal_url, terminal_dt @@ -220,6 +222,7 @@ class WebFilesetStrategy(FilesetIngestStrategy): - full fetch_resource() method which can do SPN requests """ + assert item.manifest for m in item.manifest: fetch_url = m.platform_url if not fetch_url: @@ -241,7 +244,7 @@ class WebFilesetStrategy(FilesetIngestStrategy): print("[FETCH {:>6}] {} {}".format( via, (resource and resource.status), - (resource and resource.terminal_url) or url), + (resource and resource.terminal_url) or fetch_url), file=sys.stderr) m.terminal_url = resource.terminal_url @@ -268,7 +271,7 @@ class WebFilesetStrategy(FilesetIngestStrategy): overall_status = "success" for m in item.manifest: if m.status != 'success': - overall_status = m.status + overall_status = m.status or 'not-processed' break if not item.manifest: overall_status = 'empty-manifest' diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index 9fe8b0d..037843e 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -19,20 +19,20 @@ class FilesetManifestFile(BaseModel): sha1: Optional[str] sha256: Optional[str] mimetype: Optional[str] + extra: Optional[Dict[str, Any]] status: Optional[str] platform_url: Optional[str] terminal_url: Optional[str] terminal_dt: Optional[str] - extra: Optional[Dict[str, Any]] -class DatasetPlatformItem(BaseModel): +class FilesetPlatformItem(BaseModel): platform_name: str platform_status: str - manifest: Optional[List[FilesetManifestFile]] - platform_domain: Optional[str] platform_id: Optional[str] + manifest: Optional[List[FilesetManifestFile]] + archiveorg_item_name: Optional[str] archiveorg_item_meta: Optional[dict] web_base_url: Optional[str] @@ -42,6 +42,13 @@ class ArchiveStrategyResult(BaseModel): ingest_strategy: str status: str manifest: List[FilesetManifestFile] + file_file_meta: Optional[Dict[str, Any]] + file_terminal: Optional[Dict[str, Any]] + file_cdx: Optional[Dict[str, Any]] + bundle_file_meta: Optional[Dict[str, Any]] + bundle_terminal: Optional[Any] + bundle_cdx: Optional[Any] + bundle_archiveorg_path: Optional[str] class PlatformScopeError(Exception): """ diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index ce6cdca..de392b2 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -20,7 +20,7 @@ from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, ht from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.ingest_file import IngestFileWorker -from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE +from sandcrawler.fileset_platforms import FilesetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE from sandcrawler.fileset_types import PlatformScopeError, PlatformRestrictedError @@ -47,8 +47,8 @@ class IngestFilesetWorker(IngestFileWorker): self.sink = sink self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE - self.max_total_size = 100*1024*1024*1024 - self.max_file_count = 500 + self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) + self.max_file_count = kwargs.get('max_file_count', 200) def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: @@ -79,34 +79,15 @@ class IngestFilesetWorker(IngestFileWorker): return False return True - def process(self, request: dict, key: Any = None) -> dict: - - ingest_type = request.get('ingest_type') - if ingest_type not in ("dataset",): - raise NotImplementedError(f"can't handle ingest_type={ingest_type}") - - # parse/clean URL - # note that we pass through the original/raw URL, and that is what gets - # persisted in database table - base_url = clean_url(request['base_url']) - - force_recrawl = bool(request.get('force_recrawl', False)) - - print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - - # TODO: "existing" check for new fileset ingest result table - #existing = self.check_existing_ingest(ingest_type, base_url) - #if existing: - # return self.process_existing(request, existing) + def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict: + """ + This is copypasta from process_file(), should probably refactor. + """ - result: Dict[str, Any] = dict(request=request, hit=False) + result: Dict[str, Any] = dict(hit=False) result['hops'] = [base_url] next_url = base_url - # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. - - ### START COPYPASTA from process_file(), should refactor ### - # check against blocklist for block in self.base_url_blocklist: # XXX: hack to not skip archive.org content @@ -247,9 +228,42 @@ class IngestFilesetWorker(IngestFileWorker): #raise NotImplementedError() pass - ### END COPYPASTA ### + result['_html_biblio'] = html_biblio + result['_resource'] = resource + return result + + + def process(self, request: dict, key: Any = None) -> dict: + + ingest_type = request.get('ingest_type') + if ingest_type not in ("dataset",): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + + # parse/clean URL + # note that we pass through the original/raw URL, and that is what gets + # persisted in database table + base_url = clean_url(request['base_url']) + + force_recrawl = bool(request.get('force_recrawl', False)) + + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + + # TODO: "existing" check against file and/or fileset ingest result table + #existing = self.check_existing_ingest(ingest_type, base_url) + #if existing: + # return self.process_existing(request, existing) + + result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) + if result.get('status') != None: + result['request'] = request + return result + + html_biblio = result.pop('_html_biblio') + resource = result.pop('_resource') + + # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. - # XXX: html_guess_platform() + # TODO: could involve html_guess_platform() here? # determine platform platform_helper = None @@ -291,10 +305,10 @@ class IngestFilesetWorker(IngestFileWorker): #print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name - result['platform'] = dataset_meta.platform_name + result['platform_name'] = dataset_meta.platform_name + result['platform_domain'] = dataset_meta.platform_domain result['platform_id'] = dataset_meta.platform_id - result['item_name'] = dataset_meta.archiveorg_item_name - result['item_meta'] = dataset_meta.archiveorg_item_meta + result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: result['status'] = 'empty-manifest' @@ -309,6 +323,9 @@ class IngestFilesetWorker(IngestFileWorker): result['status'] = 'too-large-size' return result if result['file_count'] > self.max_file_count: + # hard max, to prevent downstream breakage + if result['file_count'] > 10*1000: + result['manifest'] = result['manifest'][:self.max_file_count] result['status'] = 'too-many-files' return result @@ -327,8 +344,46 @@ class IngestFilesetWorker(IngestFileWorker): # 4. Summarize status and return structured result metadata. result['status'] = archive_result.status result['manifest'] = [m.dict() for m in archive_result.manifest] - result['file_count'] = len(archive_result.manifest) or None - result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None + + if ingest_strategy.endswith('-fileset-bundle'): + result['fileset_bundle'] = dict( + file_meta=archive_result.bundle_file_meta, + archiveorg_bundle_path=archive_result.archiveorg_bundle_path, + ) + if archive_result.bundle_terminal: + result['fileset_bundle']['terminal'] = dict( + terminal_url=archive_result.bundle_terminal.terminal_url, + terminal_dt=archive_result.bundle_terminal.terminal_dt, + terminal_status_code=archive_result.bundle_terminal.terminal_status_code, + ) + if archive_result.bundle_cdx: + result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx) + if archive_result.bundle_cdx.revisit_cdx: + result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx) + if ingest_strategy.endswith('-file'): + result['fileset_file'] = dict( + file_meta=archive_result.file_file_meta, + ) + if archive_result.file_terminal: + result['fileset_file']['terminal'] = dict( + terminal_url=archive_result.file_terminal.terminal_url, + terminal_dt=archive_result.file_terminal.terminal_dt, + terminal_status_code=archive_result.file_terminal.terminal_status_code, + ) + if archive_result.file_cdx: + result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx) + if archive_result.file_cdx.revisit_cdx: + result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx) + + if result['status'].startswith('success'): + # check that these are still valid + assert result['file_count'] == len(archive_result.manifest) + assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) + + + # XXX: here is where we would publish to ingest file result topic... or call process_hit()? + if result['status'] == 'success-file': + pass if result['status'].startswith('success'): result['hit'] = True |