From 826c7538e091fac14d987a3cd654975da964e240 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 27 Oct 2021 18:50:17 -0700 Subject: make fmt (black 21.9b0) --- python/sandcrawler/ingest_fileset.py | 372 +++++++++++++++++++---------------- 1 file changed, 207 insertions(+), 165 deletions(-) (limited to 'python/sandcrawler/ingest_fileset.py') diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 5728e24..227f511 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -6,15 +6,33 @@ from typing import Any, Dict, Optional import requests from selectolax.parser import HTMLParser -from sandcrawler.fileset_platforms import (ArchiveOrgHelper, DataverseHelper, FigshareHelper, - ZenodoHelper) -from sandcrawler.fileset_strategies import (ArchiveorgFilesetStrategy, ArchiveorgFileStrategy, - WebFilesetStrategy, WebFileStrategy) -from sandcrawler.fileset_types import (IngestStrategy, PlatformRestrictedError, - PlatformScopeError) +from sandcrawler.fileset_platforms import ( + ArchiveOrgHelper, + DataverseHelper, + FigshareHelper, + ZenodoHelper, +) +from sandcrawler.fileset_strategies import ( + ArchiveorgFilesetStrategy, + ArchiveorgFileStrategy, + WebFilesetStrategy, + WebFileStrategy, +) +from sandcrawler.fileset_types import ( + IngestStrategy, + PlatformRestrictedError, + PlatformScopeError, +) from sandcrawler.html_metadata import html_extract_biblio -from sandcrawler.ia import (CdxApiError, PetaboxError, SavePageNowError, WaybackContentError, - WaybackError, cdx_to_dict, fix_transfer_encoding) +from sandcrawler.ia import ( + CdxApiError, + PetaboxError, + SavePageNowError, + WaybackContentError, + WaybackError, + cdx_to_dict, + fix_transfer_encoding, +) from sandcrawler.ingest_file import IngestFileWorker from sandcrawler.misc import clean_url, gen_file_metadata from sandcrawler.workers import SandcrawlerWorker @@ -35,15 +53,16 @@ class IngestFilesetWorker(IngestFileWorker): checking to see if content has been archived already) 4. summarize status """ + def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs): super().__init__(sink=None, **kwargs) self.sink = sink self.dataset_platform_helpers = { - 'dataverse': DataverseHelper(), - 'figshare': FigshareHelper(), - 'zenodo': ZenodoHelper(), - 'archiveorg': ArchiveOrgHelper(), + "dataverse": DataverseHelper(), + "figshare": FigshareHelper(), + "zenodo": ZenodoHelper(), + "archiveorg": ArchiveOrgHelper(), } self.dataset_strategy_archivers = { IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(), @@ -52,10 +71,10 @@ class IngestFilesetWorker(IngestFileWorker): IngestStrategy.WebFile: WebFileStrategy(), } - self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024) - self.max_file_count = kwargs.get('max_file_count', 200) - self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink') - self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False) + self.max_total_size = kwargs.get("max_total_size", 64 * 1024 * 1024 * 1024) + self.max_file_count = kwargs.get("max_file_count", 200) + self.ingest_file_result_sink = kwargs.get("ingest_file_result_sink") + self.ingest_file_result_stdout = kwargs.get("ingest_file_result_stdout", False) def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ @@ -65,7 +84,7 @@ class IngestFilesetWorker(IngestFileWorker): return None existing = self.pgrest_client.get_ingest_fileset_platform(ingest_type, base_url) # TODO: filter on more flags? - if existing and existing['hit'] is True: + if existing and existing["hit"] is True: return existing else: return None @@ -78,112 +97,114 @@ class IngestFilesetWorker(IngestFileWorker): raise NotImplementedError("process_existing() not tested or safe yet") def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('dataset', ): + if not request.get("ingest_type") in ("dataset",): return False return True - def fetch_resource_iteratively(self, ingest_type: str, base_url: str, - force_recrawl: bool) -> dict: + def fetch_resource_iteratively( + self, ingest_type: str, base_url: str, force_recrawl: bool + ) -> dict: """ This is copypasta from process_file(), should probably refactor. """ result: Dict[str, Any] = dict(hit=False) - result['hops'] = [base_url] + result["hops"] = [base_url] next_url = base_url # check against blocklist for block in self.base_url_blocklist: # NOTE: hack to not skip archive.org content - if 'archive.org' in block: + if "archive.org" in block: continue if block in next_url: - result['status'] = "skip-url-blocklist" + result["status"] = "skip-url-blocklist" return result try: resource = self.find_resource(next_url, force_recrawl=force_recrawl) except SavePageNowError as e: - result['status'] = 'spn2-error' - result['error_message'] = str(e)[:1600] + result["status"] = "spn2-error" + result["error_message"] = str(e)[:1600] return result except PetaboxError as e: - result['status'] = 'petabox-error' - result['error_message'] = str(e)[:1600] + result["status"] = "petabox-error" + result["error_message"] = str(e)[:1600] return result except CdxApiError as e: - result['status'] = 'cdx-error' - result['error_message'] = str(e)[:1600] + result["status"] = "cdx-error" + result["error_message"] = str(e)[:1600] # add a sleep in cdx-error path as a slow-down time.sleep(2.0) return result except WaybackError as e: - result['status'] = 'wayback-error' - result['error_message'] = str(e)[:1600] + result["status"] = "wayback-error" + result["error_message"] = str(e)[:1600] return result except WaybackContentError as e: - result['status'] = 'wayback-content-error' - result['error_message'] = str(e)[:1600] + result["status"] = "wayback-content-error" + result["error_message"] = str(e)[:1600] return result except NotImplementedError: - #result['status'] = 'not-implemented' - #result['error_message'] = str(e)[:1600] - #return result + # result['status'] = 'not-implemented' + # result['error_message'] = str(e)[:1600] + # return result resource = None html_biblio = None if resource: if resource.terminal_url: - result['terminal'] = { + result["terminal"] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) + if resource.terminal_url not in result["hops"]: + result["hops"].append(resource.terminal_url) if not resource.hit: - result['status'] = resource.status + result["status"] = resource.status return result if resource.terminal_url: for pattern in self.base_url_blocklist: if pattern in resource.terminal_url: - result['status'] = 'skip-url-blocklist' + result["status"] = "skip-url-blocklist" return result if resource.terminal_url: for pattern in self.cookie_blocklist: if pattern in resource.terminal_url: - result['status'] = 'blocked-cookie' + result["status"] = "blocked-cookie" return result if not resource.body: - result['status'] = 'null-body' + result["status"] = "null-body" return result if len(resource.body) > MAX_BODY_SIZE_BYTES: - result['status'] = 'body-too-large' + result["status"] = "body-too-large" return result file_meta = gen_file_metadata(resource.body) try: file_meta, resource = fix_transfer_encoding(file_meta, resource) except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) + result["status"] = "bad-gzip-encoding" + result["error_message"] = str(e) return result - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' + if not resource.body or file_meta["size_bytes"] == 0: + result["status"] = "null-body" return result # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( - "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" - or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype']) + "html" in file_meta["mimetype"] + or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml" + or "application/xml" in file_meta["mimetype"] + or "text/xml" in file_meta["mimetype"] + ) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -191,10 +212,11 @@ class IngestFilesetWorker(IngestFileWorker): html_doc = HTMLParser(resource.body) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: - if 'html_biblio' not in result and html_biblio.title: - result['html_biblio'] = json.loads( - html_biblio.json(exclude_none=True)) - #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + if "html_biblio" not in result and html_biblio.title: + result["html_biblio"] = json.loads( + html_biblio.json(exclude_none=True) + ) + # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -204,69 +226,72 @@ class IngestFilesetWorker(IngestFileWorker): assert resource.terminal_status_code in (200, 226) if resource.terminal_url: - result['terminal'] = { + result["terminal"] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, - "terminal_sha1hex": file_meta['sha1hex'], + "terminal_sha1hex": file_meta["sha1hex"], } - result['file_meta'] = file_meta - result['cdx'] = cdx_to_dict(resource.cdx) + result["file_meta"] = file_meta + result["cdx"] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: - result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx) if ingest_type == "pdf": - if file_meta['mimetype'] != "application/pdf": - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + if file_meta["mimetype"] != "application/pdf": + result["status"] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", - "application/jats+xml"): - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in ( + "application/xml", + "text/xml", + "application/jats+xml", + ): + result["status"] = "wrong-mimetype" return result elif ingest_type == "html": - if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"): + result["status"] = "wrong-mimetype" return result else: - #raise NotImplementedError() + # raise NotImplementedError() pass - result['_html_biblio'] = html_biblio - result['_resource'] = resource + result["_html_biblio"] = html_biblio + result["_resource"] = resource return result def process(self, request: dict, key: Any = None) -> dict: - ingest_type = request.get('ingest_type') - if ingest_type not in ("dataset", ): + ingest_type = request.get("ingest_type") + if ingest_type not in ("dataset",): raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets # persisted in database table - base_url = clean_url(request['base_url']) + base_url = clean_url(request["base_url"]) - force_recrawl = bool(request.get('force_recrawl', False)) + force_recrawl = bool(request.get("force_recrawl", False)) print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) # TODO: "existing" check against file and/or fileset ingest result table - #existing = self.check_existing_ingest(ingest_type, base_url) - #if existing: + # existing = self.check_existing_ingest(ingest_type, base_url) + # if existing: # return self.process_existing(request, existing) - result = self.fetch_resource_iteratively(ingest_type, - base_url, - force_recrawl=force_recrawl) - result['request'] = request - if result.get('status') is not None: - result['request'] = request + result = self.fetch_resource_iteratively( + ingest_type, base_url, force_recrawl=force_recrawl + ) + result["request"] = request + if result.get("status") is not None: + result["request"] = request return result - html_biblio = result.pop('_html_biblio') - resource = result.pop('_resource') + html_biblio = result.pop("_html_biblio") + resource = result.pop("_resource") # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. @@ -280,166 +305,183 @@ class IngestFilesetWorker(IngestFileWorker): break if not platform_helper: - result['status'] = 'no-platform-match' + result["status"] = "no-platform-match" return result # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`. try: dataset_meta = platform_helper.process_request(request, resource, html_biblio) except PlatformScopeError as e: - result['status'] = 'platform-scope' - result['error_message'] = str(e)[:1600] + result["status"] = "platform-scope" + result["error_message"] = str(e)[:1600] return result except PlatformRestrictedError as e: - result['status'] = 'platform-restricted' - result['error_message'] = str(e)[:1600] + result["status"] = "platform-restricted" + result["error_message"] = str(e)[:1600] return result except NotImplementedError as e: - result['status'] = 'not-implemented' - result['error_message'] = str(e)[:1600] + result["status"] = "not-implemented" + result["error_message"] = str(e)[:1600] return result except requests.exceptions.HTTPError as e: if e.response.status_code == 404: - result['status'] = 'platform-404' - result['error_message'] = str(e)[:1600] + result["status"] = "platform-404" + result["error_message"] = str(e)[:1600] return result else: raise e - #print(dataset_meta, file=sys.stderr) + # print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name - result['platform_name'] = dataset_meta.platform_name - result['platform_domain'] = dataset_meta.platform_domain - result['platform_id'] = dataset_meta.platform_id - result['platform_base_url'] = dataset_meta.web_base_url - result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name + result["platform_name"] = dataset_meta.platform_name + result["platform_domain"] = dataset_meta.platform_domain + result["platform_id"] = dataset_meta.platform_id + result["platform_base_url"] = dataset_meta.web_base_url + result["archiveorg_item_name"] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: - result['status'] = 'empty-manifest' + result["status"] = "empty-manifest" return result # these will get confirmed/updated after ingest - result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest] - result['file_count'] = len(dataset_meta.manifest) - result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) + result["manifest"] = [m.dict(exclude_none=True) for m in dataset_meta.manifest] + result["file_count"] = len(dataset_meta.manifest) + result["total_size"] = sum([m.size for m in dataset_meta.manifest if m.size]) - if result['total_size'] > self.max_total_size: - result['status'] = 'too-large-size' + if result["total_size"] > self.max_total_size: + result["status"] = "too-large-size" return result - if result['file_count'] > self.max_file_count: + if result["file_count"] > self.max_file_count: # hard max, to prevent downstream breakage - if result['file_count'] > 10 * 1000: - result['manifest'] = result['manifest'][:self.max_file_count] - result['status'] = 'too-many-files' + if result["file_count"] > 10 * 1000: + result["manifest"] = result["manifest"][: self.max_file_count] + result["status"] = "too-many-files" return result ingest_strategy = platform_helper.chose_strategy(dataset_meta) - result['ingest_strategy'] = ingest_strategy + result["ingest_strategy"] = ingest_strategy print( f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", - file=sys.stderr) + file=sys.stderr, + ) strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) if not strategy_helper: - result['status'] = 'no-strategy-helper' + result["status"] = "no-strategy-helper" return result # 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata. archive_result = strategy_helper.process(dataset_meta) # 4. Summarize status and return structured result metadata. - result['status'] = archive_result.status - result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest] + result["status"] = archive_result.status + result["manifest"] = [m.dict(exclude_none=True) for m in archive_result.manifest] - if ingest_strategy.endswith('-fileset-bundle'): - result['fileset_bundle'] = dict() + if ingest_strategy.endswith("-fileset-bundle"): + result["fileset_bundle"] = dict() if archive_result.bundle_file_meta: - result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta + result["fileset_bundle"]["file_meta"] = archive_result.bundle_file_meta if archive_result.bundle_archiveorg_path: - result['fileset_bundle'][ - 'archiveorg_bundle_path'] = archive_result.bundle_archiveorg_path + result["fileset_bundle"][ + "archiveorg_bundle_path" + ] = archive_result.bundle_archiveorg_path if archive_result.bundle_resource: - result['fileset_bundle']['terminal'] = dict( + result["fileset_bundle"]["terminal"] = dict( terminal_url=archive_result.bundle_resource.terminal_url, terminal_dt=archive_result.bundle_resource.terminal_dt, terminal_status_code=archive_result.bundle_resource.terminal_status_code, ) if archive_result.bundle_resource.cdx: - result['fileset_bundle']['cdx'] = cdx_to_dict( - archive_result.bundle_resource.cdx) + result["fileset_bundle"]["cdx"] = cdx_to_dict( + archive_result.bundle_resource.cdx + ) if archive_result.bundle_resource.revisit_cdx: - result['fileset_bundle']['revisit_cdx'] = cdx_to_dict( - archive_result.bundle_resource.revisit_cdx) + result["fileset_bundle"]["revisit_cdx"] = cdx_to_dict( + archive_result.bundle_resource.revisit_cdx + ) - if ingest_strategy.endswith('-file'): - result['fileset_file'] = dict() + if ingest_strategy.endswith("-file"): + result["fileset_file"] = dict() if archive_result.file_file_meta: - result['fileset_file']['file_meta'] = archive_result.file_file_meta, + result["fileset_file"]["file_meta"] = (archive_result.file_file_meta,) if archive_result.file_resource: - result['fileset_file']['terminal'] = dict( + result["fileset_file"]["terminal"] = dict( terminal_url=archive_result.file_resource.terminal_url, terminal_dt=archive_result.file_resource.terminal_dt, terminal_status_code=archive_result.file_resource.terminal_status_code, ) if archive_result.file_resource.cdx: - result['fileset_file']['cdx'] = cdx_to_dict( - archive_result.file_resource.cdx) + result["fileset_file"]["cdx"] = cdx_to_dict( + archive_result.file_resource.cdx + ) if archive_result.file_resource.revisit_cdx: - result['fileset_file']['revisit_cdx'] = cdx_to_dict( - archive_result.file_resource.revisit_cdx) + result["fileset_file"]["revisit_cdx"] = cdx_to_dict( + archive_result.file_resource.revisit_cdx + ) - if result['status'].startswith('success'): + if result["status"].startswith("success"): # check that these are still valid - assert result['file_count'] == len(archive_result.manifest) - assert result['total_size'] == sum( - [m.size for m in archive_result.manifest if m.size]) + assert result["file_count"] == len(archive_result.manifest) + assert result["total_size"] == sum( + [m.size for m in archive_result.manifest if m.size] + ) - if result[ - 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: + if ( + result["status"] == "success-file" + and archive_result.file_resource + and archive_result.file_file_meta + ): file_result: Dict[str, Any] = dict( hit=True, - status='success', + status="success", request=request.copy(), file_meta=archive_result.file_file_meta, terminal=dict( terminal_url=archive_result.file_resource.terminal_url, terminal_dt=archive_result.file_resource.terminal_dt, terminal_status_code=archive_result.file_resource.terminal_status_code, - terminal_sha1hex=archive_result.file_file_meta['sha1hex'], + terminal_sha1hex=archive_result.file_file_meta["sha1hex"], ), ) if archive_result.file_resource.cdx: - file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + file_result["cdx"] = cdx_to_dict(archive_result.file_resource.cdx) if archive_result.file_resource.revisit_cdx: - file_result['revisit_cdx'] = cdx_to_dict( - archive_result.file_resource.revisit_cdx) - file_result['request']['ingest_type'] = request['ingest_type'] + "-file" + file_result["revisit_cdx"] = cdx_to_dict( + archive_result.file_resource.revisit_cdx + ) + file_result["request"]["ingest_type"] = request["ingest_type"] + "-file" # call the super() (ingest_file) version of process_hit() - info = self.process_file_hit(file_result['request']['ingest_type'], - archive_result.file_resource, - archive_result.file_file_meta) + info = self.process_file_hit( + file_result["request"]["ingest_type"], + archive_result.file_resource, + archive_result.file_file_meta, + ) file_result.update(info) if self.ingest_file_result_sink: self.ingest_file_result_sink.push_record(result.copy()) elif self.ingest_file_result_stdout: sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n") - if result['status'].startswith('success'): - result['hit'] = True - print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( - ingest_type, - result['file_count'], - result['total_size'], - ingest_strategy, - ), - file=sys.stderr) + if result["status"].startswith("success"): + result["hit"] = True + print( + "[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( + ingest_type, + result["file_count"], + result["total_size"], + ingest_strategy, + ), + file=sys.stderr, + ) else: - print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( - ingest_type, - result['status'], - result['file_count'], - result['total_size'], - ingest_strategy, - ), - file=sys.stderr) + print( + "[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( + ingest_type, + result["status"], + result["file_count"], + result["total_size"], + ingest_strategy, + ), + file=sys.stderr, + ) return result -- cgit v1.2.3