diff options
Diffstat (limited to 'python/sandcrawler/ingest_fileset.py')
-rw-r--r-- | python/sandcrawler/ingest_fileset.py | 123 |
1 files changed, 89 insertions, 34 deletions
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index ce6cdca..de392b2 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -20,7 +20,7 @@ from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, ht from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.ingest_file import IngestFileWorker -from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE +from sandcrawler.fileset_platforms import FilesetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE from sandcrawler.fileset_types import PlatformScopeError, PlatformRestrictedError @@ -47,8 +47,8 @@ class IngestFilesetWorker(IngestFileWorker): self.sink = sink self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE - self.max_total_size = 100*1024*1024*1024 - self.max_file_count = 500 + self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) + self.max_file_count = kwargs.get('max_file_count', 200) def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: @@ -79,34 +79,15 @@ class IngestFilesetWorker(IngestFileWorker): return False return True - def process(self, request: dict, key: Any = None) -> dict: - - ingest_type = request.get('ingest_type') - if ingest_type not in ("dataset",): - raise NotImplementedError(f"can't handle ingest_type={ingest_type}") - - # parse/clean URL - # note that we pass through the original/raw URL, and that is what gets - # persisted in database table - base_url = clean_url(request['base_url']) - - force_recrawl = bool(request.get('force_recrawl', False)) - - print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - - # TODO: "existing" check for new fileset ingest result table - #existing = self.check_existing_ingest(ingest_type, base_url) - #if existing: - # return self.process_existing(request, existing) + def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict: + """ + This is copypasta from process_file(), should probably refactor. + """ - result: Dict[str, Any] = dict(request=request, hit=False) + result: Dict[str, Any] = dict(hit=False) result['hops'] = [base_url] next_url = base_url - # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. - - ### START COPYPASTA from process_file(), should refactor ### - # check against blocklist for block in self.base_url_blocklist: # XXX: hack to not skip archive.org content @@ -247,9 +228,42 @@ class IngestFilesetWorker(IngestFileWorker): #raise NotImplementedError() pass - ### END COPYPASTA ### + result['_html_biblio'] = html_biblio + result['_resource'] = resource + return result + + + def process(self, request: dict, key: Any = None) -> dict: + + ingest_type = request.get('ingest_type') + if ingest_type not in ("dataset",): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + + # parse/clean URL + # note that we pass through the original/raw URL, and that is what gets + # persisted in database table + base_url = clean_url(request['base_url']) + + force_recrawl = bool(request.get('force_recrawl', False)) + + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + + # TODO: "existing" check against file and/or fileset ingest result table + #existing = self.check_existing_ingest(ingest_type, base_url) + #if existing: + # return self.process_existing(request, existing) + + result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) + if result.get('status') != None: + result['request'] = request + return result + + html_biblio = result.pop('_html_biblio') + resource = result.pop('_resource') + + # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. - # XXX: html_guess_platform() + # TODO: could involve html_guess_platform() here? # determine platform platform_helper = None @@ -291,10 +305,10 @@ class IngestFilesetWorker(IngestFileWorker): #print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name - result['platform'] = dataset_meta.platform_name + result['platform_name'] = dataset_meta.platform_name + result['platform_domain'] = dataset_meta.platform_domain result['platform_id'] = dataset_meta.platform_id - result['item_name'] = dataset_meta.archiveorg_item_name - result['item_meta'] = dataset_meta.archiveorg_item_meta + result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: result['status'] = 'empty-manifest' @@ -309,6 +323,9 @@ class IngestFilesetWorker(IngestFileWorker): result['status'] = 'too-large-size' return result if result['file_count'] > self.max_file_count: + # hard max, to prevent downstream breakage + if result['file_count'] > 10*1000: + result['manifest'] = result['manifest'][:self.max_file_count] result['status'] = 'too-many-files' return result @@ -327,8 +344,46 @@ class IngestFilesetWorker(IngestFileWorker): # 4. Summarize status and return structured result metadata. result['status'] = archive_result.status result['manifest'] = [m.dict() for m in archive_result.manifest] - result['file_count'] = len(archive_result.manifest) or None - result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None + + if ingest_strategy.endswith('-fileset-bundle'): + result['fileset_bundle'] = dict( + file_meta=archive_result.bundle_file_meta, + archiveorg_bundle_path=archive_result.archiveorg_bundle_path, + ) + if archive_result.bundle_terminal: + result['fileset_bundle']['terminal'] = dict( + terminal_url=archive_result.bundle_terminal.terminal_url, + terminal_dt=archive_result.bundle_terminal.terminal_dt, + terminal_status_code=archive_result.bundle_terminal.terminal_status_code, + ) + if archive_result.bundle_cdx: + result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx) + if archive_result.bundle_cdx.revisit_cdx: + result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx) + if ingest_strategy.endswith('-file'): + result['fileset_file'] = dict( + file_meta=archive_result.file_file_meta, + ) + if archive_result.file_terminal: + result['fileset_file']['terminal'] = dict( + terminal_url=archive_result.file_terminal.terminal_url, + terminal_dt=archive_result.file_terminal.terminal_dt, + terminal_status_code=archive_result.file_terminal.terminal_status_code, + ) + if archive_result.file_cdx: + result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx) + if archive_result.file_cdx.revisit_cdx: + result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx) + + if result['status'].startswith('success'): + # check that these are still valid + assert result['file_count'] == len(archive_result.manifest) + assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) + + + # XXX: here is where we would publish to ingest file result topic... or call process_hit()? + if result['status'] == 'success-file': + pass if result['status'].startswith('success'): result['hit'] = True |