From a09396caefe709b521e560add5b01c1a5c94cb53 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 15 Oct 2021 17:12:40 -0700 Subject: more fileset iteration --- python/sandcrawler/ingest_fileset.py | 89 +++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 33 deletions(-) (limited to 'python/sandcrawler/ingest_fileset.py') diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index de392b2..7c0dfbd 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -49,7 +49,8 @@ class IngestFilesetWorker(IngestFileWorker): self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) self.max_file_count = kwargs.get('max_file_count', 200) - + self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink') + self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False) def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ @@ -71,9 +72,6 @@ class IngestFilesetWorker(IngestFileWorker): """ raise NotImplementedError("process_existing() not tested or safe yet") - # XXX: use file version - #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: - def want(self, request: dict) -> bool: if not request.get('ingest_type') in ('dataset',): return False @@ -254,6 +252,7 @@ class IngestFilesetWorker(IngestFileWorker): # return self.process_existing(request, existing) result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) + result['request'] = request if result.get('status') != None: result['request'] = request return result @@ -308,6 +307,7 @@ class IngestFilesetWorker(IngestFileWorker): result['platform_name'] = dataset_meta.platform_name result['platform_domain'] = dataset_meta.platform_domain result['platform_id'] = dataset_meta.platform_id + result['platform_base_url'] = dataset_meta.web_base_url result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: @@ -315,7 +315,7 @@ class IngestFilesetWorker(IngestFileWorker): return result # these will get confirmed/updated after ingest - result['manifest'] = [m.dict() for m in dataset_meta.manifest] + result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest] result['file_count'] = len(dataset_meta.manifest) result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) @@ -343,47 +343,70 @@ class IngestFilesetWorker(IngestFileWorker): # 4. Summarize status and return structured result metadata. result['status'] = archive_result.status - result['manifest'] = [m.dict() for m in archive_result.manifest] + result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest] if ingest_strategy.endswith('-fileset-bundle'): - result['fileset_bundle'] = dict( - file_meta=archive_result.bundle_file_meta, - archiveorg_bundle_path=archive_result.archiveorg_bundle_path, - ) - if archive_result.bundle_terminal: + result['fileset_bundle'] = dict() + if archive_result.bundle_file_meta: + result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta + if archive_result.archiveorg_bundle_path: + result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path + if archive_result.bundle_resource: result['fileset_bundle']['terminal'] = dict( - terminal_url=archive_result.bundle_terminal.terminal_url, - terminal_dt=archive_result.bundle_terminal.terminal_dt, - terminal_status_code=archive_result.bundle_terminal.terminal_status_code, + terminal_url=archive_result.bundle_resource.terminal_url, + terminal_dt=archive_result.bundle_resource.terminal_dt, + terminal_status_code=archive_result.bundle_resource.terminal_status_code, ) - if archive_result.bundle_cdx: - result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx) - if archive_result.bundle_cdx.revisit_cdx: - result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx) + if archive_result.bundle_resource.cdx: + result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx) + if archive_result.bundle_resource.revisit_cdx: + result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx) + if ingest_strategy.endswith('-file'): - result['fileset_file'] = dict( - file_meta=archive_result.file_file_meta, - ) - if archive_result.file_terminal: + result['fileset_file'] = dict() + if archive_result.file_file_meta: + result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta, + if archive_result.file_resource: result['fileset_file']['terminal'] = dict( - terminal_url=archive_result.file_terminal.terminal_url, - terminal_dt=archive_result.file_terminal.terminal_dt, - terminal_status_code=archive_result.file_terminal.terminal_status_code, + terminal_url=archive_result.file_resource.terminal_url, + terminal_dt=archive_result.file_resource.terminal_dt, + terminal_status_code=archive_result.file_resource.terminal_status_code, ) - if archive_result.file_cdx: - result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx) - if archive_result.file_cdx.revisit_cdx: - result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx) + if archive_result.file_resource.cdx: + result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + if archive_result.file_resource.revisit_cdx: + result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) if result['status'].startswith('success'): # check that these are still valid assert result['file_count'] == len(archive_result.manifest) assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) - - # XXX: here is where we would publish to ingest file result topic... or call process_hit()? - if result['status'] == 'success-file': - pass + if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: + file_result = dict( + hit=True, + status='success', + request=request.copy(), + file_meta=archive_result.file_file_meta, + terminal=dict( + terminal_url=archive_result.file_resource.terminal_url, + terminal_dt=archive_result.file_resource.terminal_dt, + terminal_status_code=archive_result.file_resource.terminal_status_code, + terminal_sha1hex=archive_result.file_file_meta['sha1hex'], + ), + ) + if archive_result.file_resource.cdx: + file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + if archive_result.file_resource.revisit_cdx: + file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) + file_result['request']['ingest_type'] = request['ingest_type'] + "-file" + # call the super() (ingest_file) version of process_hit() + info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta) + file_result.update(info) + if self.ingest_file_result_sink: + self.ingest_file_result_sink.push_record(result.copy()) + elif self.ingest_file_result_stdout: + sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n") if result['status'].startswith('success'): result['hit'] = True -- cgit v1.2.3