From a09396caefe709b521e560add5b01c1a5c94cb53 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 15 Oct 2021 17:12:40 -0700 Subject: more fileset iteration --- python/ingest_tool.py | 1 + python/sandcrawler/fileset_platforms.py | 4 +- python/sandcrawler/fileset_strategies.py | 26 +++++++--- python/sandcrawler/fileset_types.py | 6 +-- python/sandcrawler/ingest_fileset.py | 89 ++++++++++++++++++++------------ 5 files changed, 81 insertions(+), 45 deletions(-) (limited to 'python') diff --git a/python/ingest_tool.py b/python/ingest_tool.py index 45e1812..fdb5b48 100755 --- a/python/ingest_tool.py +++ b/python/ingest_tool.py @@ -21,6 +21,7 @@ def run_single_ingest(args): if request['ingest_type'] in ['dataset',]: ingester = IngestFilesetWorker( try_spn2=not args.no_spn2, + ingest_file_result_stdout=True, ) else: ingester = IngestFileWorker( diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index bcf2144..cc07948 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -674,8 +674,8 @@ class ArchiveOrgHelper(FilesetPlatformHelper): assert base_url_split[2] == 'archive.org' assert base_url_split[3] in ['details', 'download'] item_name = base_url_split[4] - if len(base_url_split) == 6: - assert not base_url_split[5] + if len(base_url_split) == 6 and base_url_split[5]: + raise PlatformScopeError("got an archive.org file path, not download/details page; individual files not handled yet") #print(f" archiveorg processing item={item_name}", file=sys.stderr) item = self.session.get_item(item_name) diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index d1193ee..f2f2fcc 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -13,14 +13,14 @@ import internetarchive from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult, WaybackClient, SavePageNowClient, fix_transfer_encoding from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, FilesetPlatformItem, ArchiveStrategyResult, PlatformScopeError -from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path +from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path class FilesetIngestStrategy(): def __init__(self): #self.ingest_strategy = 'unknown' - pass + self.success_status = "success" def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: raise NotImplementedError() @@ -34,7 +34,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): def __init__(self, **kwargs): self.ingest_strategy = IngestStrategy.ArchiveorgFileset - # XXX: enable cleanup when confident (eg, safe path parsing) + # TODO: enable cleanup when confident (eg, safe path parsing) self.skip_cleanup_local_files = kwargs.get('skip_cleanup_local_files', True) self.working_dir = os.environ.get('SANDCRAWLER_WORKING_DIR', '/tmp/sandcrawler/') try: @@ -97,7 +97,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): # 1. download all files locally assert item.manifest for m in item.manifest: - # XXX: enforce safe/sane filename + if m.path != sanitize_fs_path(m.path): + m.status = 'unsafe-path' + continue local_path = local_dir + '/' + m.path assert m.platform_url @@ -173,7 +175,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): result = ArchiveStrategyResult( ingest_strategy=self.ingest_strategy, - status='success', + status=self.success_status, manifest=item.manifest, ) @@ -188,6 +190,7 @@ class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): def __init__(self): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset + self.success_status = "success-file" class WebFilesetStrategy(FilesetIngestStrategy): @@ -206,6 +209,8 @@ class WebFilesetStrategy(FilesetIngestStrategy): """ assert item.manifest + file_file_meta = None + file_resource = None for m in item.manifest: fetch_url = m.platform_url if not fetch_url: @@ -228,6 +233,8 @@ class WebFilesetStrategy(FilesetIngestStrategy): m.terminal_url = resource.terminal_url m.terminal_dt = resource.terminal_dt m.status = resource.status + if self.ingest_strategy == "web-file": + file_resource = resource if resource.status != 'success': continue @@ -237,6 +244,9 @@ class WebFilesetStrategy(FilesetIngestStrategy): file_meta = gen_file_metadata(resource.body) file_meta, html_resource = fix_transfer_encoding(file_meta, resource) + if self.ingest_strategy == "web-file": + file_file_meta = file_meta + if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']): m.status = 'mismatch' continue @@ -246,7 +256,7 @@ class WebFilesetStrategy(FilesetIngestStrategy): m.sha256 = m.sha256 or file_meta['sha256hex'] m.mimetype = m.mimetype or file_meta['mimetype'] - overall_status = "success" + overall_status = self.success_status for m in item.manifest: if m.status != 'success': overall_status = m.status or 'not-processed' @@ -259,6 +269,9 @@ class WebFilesetStrategy(FilesetIngestStrategy): status=overall_status, manifest=item.manifest, ) + if self.ingest_strategy == "web-file": + result.file_file_meta = file_file_meta + result.file_resource = file_resource return result class WebFileStrategy(WebFilesetStrategy): @@ -266,6 +279,7 @@ class WebFileStrategy(WebFilesetStrategy): def __init__(self, **kwargs): super().__init__(**kwargs) self.ingest_strategy = IngestStrategy.WebFile + self.success_status = "success-file" FILESET_STRATEGY_HELPER_TABLE = { diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index 037843e..d7e9d6d 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -43,11 +43,9 @@ class ArchiveStrategyResult(BaseModel): status: str manifest: List[FilesetManifestFile] file_file_meta: Optional[Dict[str, Any]] - file_terminal: Optional[Dict[str, Any]] - file_cdx: Optional[Dict[str, Any]] + file_resource: Optional[Any] bundle_file_meta: Optional[Dict[str, Any]] - bundle_terminal: Optional[Any] - bundle_cdx: Optional[Any] + bundle_resource: Optional[Any] bundle_archiveorg_path: Optional[str] class PlatformScopeError(Exception): diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index de392b2..7c0dfbd 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -49,7 +49,8 @@ class IngestFilesetWorker(IngestFileWorker): self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) self.max_file_count = kwargs.get('max_file_count', 200) - + self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink') + self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False) def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ @@ -71,9 +72,6 @@ class IngestFilesetWorker(IngestFileWorker): """ raise NotImplementedError("process_existing() not tested or safe yet") - # XXX: use file version - #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: - def want(self, request: dict) -> bool: if not request.get('ingest_type') in ('dataset',): return False @@ -254,6 +252,7 @@ class IngestFilesetWorker(IngestFileWorker): # return self.process_existing(request, existing) result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) + result['request'] = request if result.get('status') != None: result['request'] = request return result @@ -308,6 +307,7 @@ class IngestFilesetWorker(IngestFileWorker): result['platform_name'] = dataset_meta.platform_name result['platform_domain'] = dataset_meta.platform_domain result['platform_id'] = dataset_meta.platform_id + result['platform_base_url'] = dataset_meta.web_base_url result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: @@ -315,7 +315,7 @@ class IngestFilesetWorker(IngestFileWorker): return result # these will get confirmed/updated after ingest - result['manifest'] = [m.dict() for m in dataset_meta.manifest] + result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest] result['file_count'] = len(dataset_meta.manifest) result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) @@ -343,47 +343,70 @@ class IngestFilesetWorker(IngestFileWorker): # 4. Summarize status and return structured result metadata. result['status'] = archive_result.status - result['manifest'] = [m.dict() for m in archive_result.manifest] + result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest] if ingest_strategy.endswith('-fileset-bundle'): - result['fileset_bundle'] = dict( - file_meta=archive_result.bundle_file_meta, - archiveorg_bundle_path=archive_result.archiveorg_bundle_path, - ) - if archive_result.bundle_terminal: + result['fileset_bundle'] = dict() + if archive_result.bundle_file_meta: + result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta + if archive_result.archiveorg_bundle_path: + result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path + if archive_result.bundle_resource: result['fileset_bundle']['terminal'] = dict( - terminal_url=archive_result.bundle_terminal.terminal_url, - terminal_dt=archive_result.bundle_terminal.terminal_dt, - terminal_status_code=archive_result.bundle_terminal.terminal_status_code, + terminal_url=archive_result.bundle_resource.terminal_url, + terminal_dt=archive_result.bundle_resource.terminal_dt, + terminal_status_code=archive_result.bundle_resource.terminal_status_code, ) - if archive_result.bundle_cdx: - result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx) - if archive_result.bundle_cdx.revisit_cdx: - result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx) + if archive_result.bundle_resource.cdx: + result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx) + if archive_result.bundle_resource.revisit_cdx: + result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx) + if ingest_strategy.endswith('-file'): - result['fileset_file'] = dict( - file_meta=archive_result.file_file_meta, - ) - if archive_result.file_terminal: + result['fileset_file'] = dict() + if archive_result.file_file_meta: + result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta, + if archive_result.file_resource: result['fileset_file']['terminal'] = dict( - terminal_url=archive_result.file_terminal.terminal_url, - terminal_dt=archive_result.file_terminal.terminal_dt, - terminal_status_code=archive_result.file_terminal.terminal_status_code, + terminal_url=archive_result.file_resource.terminal_url, + terminal_dt=archive_result.file_resource.terminal_dt, + terminal_status_code=archive_result.file_resource.terminal_status_code, ) - if archive_result.file_cdx: - result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx) - if archive_result.file_cdx.revisit_cdx: - result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx) + if archive_result.file_resource.cdx: + result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + if archive_result.file_resource.revisit_cdx: + result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) if result['status'].startswith('success'): # check that these are still valid assert result['file_count'] == len(archive_result.manifest) assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) - - # XXX: here is where we would publish to ingest file result topic... or call process_hit()? - if result['status'] == 'success-file': - pass + if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: + file_result = dict( + hit=True, + status='success', + request=request.copy(), + file_meta=archive_result.file_file_meta, + terminal=dict( + terminal_url=archive_result.file_resource.terminal_url, + terminal_dt=archive_result.file_resource.terminal_dt, + terminal_status_code=archive_result.file_resource.terminal_status_code, + terminal_sha1hex=archive_result.file_file_meta['sha1hex'], + ), + ) + if archive_result.file_resource.cdx: + file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + if archive_result.file_resource.revisit_cdx: + file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) + file_result['request']['ingest_type'] = request['ingest_type'] + "-file" + # call the super() (ingest_file) version of process_hit() + info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta) + file_result.update(info) + if self.ingest_file_result_sink: + self.ingest_file_result_sink.push_record(result.copy()) + elif self.ingest_file_result_stdout: + sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n") if result['status'].startswith('success'): result['hit'] = True -- cgit v1.2.3