From c15432c0ce52c48efabcd7e3221a5d625ef3e9d0 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 12 Jul 2022 15:03:29 -0700 Subject: WIP: refactor logging calls in ingest pipelines --- python/sandcrawler/fileset_strategies.py | 21 +++++---- python/sandcrawler/html_metadata.py | 6 +-- python/sandcrawler/ia.py | 52 +++++++++------------- python/sandcrawler/ingest_file.py | 76 ++++++++++++++------------------ python/sandcrawler/ingest_fileset.py | 43 +++++++++--------- python/sandcrawler/ingest_html.py | 5 +-- 6 files changed, 89 insertions(+), 114 deletions(-) diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index 1d84ce5..d49a5ff 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -1,3 +1,4 @@ +import logging import os import shutil import sys @@ -192,9 +193,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): ): # these 'tab-separated-values' from dataverse are just noise, don't log them if m.mimetype != "text/tab-separated-values": - print( - f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", - file=sys.stderr, + logging.warn( + f"mimetype mismatch expected={m.mimetype} found={file_meta['mimetype']}" ) m.mimetype = file_meta["mimetype"] else: @@ -314,14 +314,13 @@ class WebFilesetStrategy(FilesetIngestStrategy): fetch_url, self.wayback_client, force_simple_get=True ) - print( - "[FETCH {:>6}] {} {}".format( - via, - (resource and resource.status), - (resource and resource.terminal_url) or fetch_url, - ), - file=sys.stderr, - ) + if resource: + print( + f"fetch {via=} {fetch_url=} {resource.status=} {resource.terminal_url=}", + file=sys.stderr, + ) + else: + print(f"fetch {via=} {fetch_url=} status=", file=sys.stderr) m.terminal_url = resource.terminal_url m.terminal_dt = resource.terminal_dt diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py index 2fb500c..cee3b00 100644 --- a/python/sandcrawler/html_metadata.py +++ b/python/sandcrawler/html_metadata.py @@ -1,5 +1,5 @@ import datetime -import sys +import logging import urllib.parse from typing import Any, Dict, List, Optional, Tuple @@ -726,7 +726,7 @@ def html_extract_fulltext_url( continue return (val, pattern.get("technique", "unknown")) if self_doc_url: - print(" WARN: returning fulltext URL pointing to self", file=sys.stderr) + logging.warn(f"returning fulltext URL pointing to self {self_doc_url=}") return self_doc_url return None @@ -736,7 +736,7 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat meta: Any = dict() head = doc.css_first("head") if not head: - print(f"WARN: empty ? {doc_url}", file=sys.stderr) + logging.warn(f"empty HTML head {doc_url=}") return None for field, patterns in HEAD_META_PATTERNS.items(): diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 6003f02..4a8d71b 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -6,6 +6,7 @@ import datetime import gzip import http.client import json +import logging import os import sys import time @@ -255,7 +256,7 @@ class CdxApiClient: next_sleep = retry_sleep - 3 retry_sleep = 3 print( - " CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + f"CDX fetch failed; will sleep {retry_sleep}sec and try again", file=sys.stderr, ) time.sleep(retry_sleep) @@ -268,7 +269,7 @@ class CdxApiClient: if not (fuzzy_match_url(row.url, url) and row.datetime == datetime): if retry_sleep and retry_sleep > 0: print( - " CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + f"CDX fetch failed; will sleep {retry_sleep}sec and try again", file=sys.stderr, ) time.sleep(retry_sleep) @@ -276,9 +277,7 @@ class CdxApiClient: url, datetime, filter_status_code=filter_status_code, retry_sleep=None ) raise KeyError( - "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format( - url, datetime, row - ) + f"Didn't get exact CDX url/datetime match. {url=} {datetime=} {row=}" ) if filter_status_code: assert row.status_code == filter_status_code @@ -438,12 +437,12 @@ class WaybackClient: # print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) gwb_record = self.rstore.load_resource(warc_uri, offset, csize) except wayback.exception.ResourceUnavailable: - print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + logging.warn(f"failed to fetch from petabox WARC {warc_path=}") raise PetaboxError( "failed to load file contents from wayback/petabox (ResourceUnavailable)" ) except wayback.exception.InvalidResource: - print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + logging.warn(f"failed to fetch from petabox WARC {warc_path=}") raise WaybackContentError( "failed to load file contents from wayback/petabox (InvalidResource)" ) @@ -643,11 +642,8 @@ class WaybackClient: # TODO: don't need *all* these hashes, just sha1 file_meta = gen_file_metadata(resp.content) if cdx_sha1hex != file_meta["sha1hex"]: - print( - " REPLAY MISMATCH: cdx:{} replay:{}".format( - cdx_sha1hex, file_meta["sha1hex"] - ), - file=sys.stderr, + logging.warn( + f"CDX/wayback replay mismatch {cdx_sha1hex=} sha1hex={file_meta['sha1hex']}" ) raise WaybackContentError( "replay fetch body didn't match CDX hash cdx:{} body:{}".format( @@ -747,7 +743,7 @@ class WaybackClient: next_url = start_url urls_seen = [start_url] for i in range(self.max_redirects + 1): - print(" URL: {}".format(next_url), file=sys.stderr) + print(f"cdx-lookup {next_url=}", file=sys.stderr) next_row: Optional[CdxRow] = self.cdx_client.lookup_best( next_url, best_mimetype=best_mimetype, closest=closest ) @@ -993,7 +989,7 @@ class SavePageNowClient: non-200 remote statuses, invalid hosts/URLs, timeouts, backoff, etc. """ if capture_outlinks: - print(" capturing outlinks!", file=sys.stderr) + logging.warn(f"SPNv2 request with outlink capture {request_url=}") if not (self.ia_access_key and self.ia_secret_key): raise Exception("SPN2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)") if request_url.startswith("ftp://"): @@ -1024,7 +1020,7 @@ class SavePageNowClient: resp.raise_for_status() status_user = resp.json() if status_user["available"] <= 1: - print(f"SPNv2 user slots not available: {resp.text}", file=sys.stderr) + logging.warn(f"SPNv2 user slots not available: {resp.text}") raise SavePageNowBackoffError( "SPNv2 availability: {}, url: {}".format(status_user, request_url) ) @@ -1085,7 +1081,7 @@ class SavePageNowClient: ) job_id = resp_json["job_id"] - print(f" SPNv2 running: job_id={job_id} url={request_url}", file=sys.stderr) + print(f"spn2-api-request {job_id=} {request_url=}", file=sys.stderr) time.sleep(0.1) # poll until complete @@ -1113,13 +1109,12 @@ class SavePageNowClient: # if there was a recent crawl of same URL, fetch the status of that # crawl to get correct datetime if final_json.get("original_job_id"): + original_job_id = final_json.get("original_job_id") print( - f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", + f"SPN recent capture {job_id=} {original_job_id=}", file=sys.stderr, ) - resp = self.v2_session.get( - "{}/status/{}".format(self.v2endpoint, final_json["original_job_id"]) - ) + resp = self.v2_session.get(f"{self.v2endpoint}/status/{original_job_id}") try: resp.raise_for_status() except Exception: @@ -1130,10 +1125,7 @@ class SavePageNowClient: if final_json["status"] == "success": if final_json.get("original_url").startswith("/"): - print( - f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", - file=sys.stderr, - ) + logging.warn(f"truncated URL in JSON {request_url=} {json.dumps(final_json)}") return SavePageNowResult( True, "success", @@ -1254,11 +1246,10 @@ class SavePageNowClient: best_mimetype="application/pdf", ) if elsevier_pdf_cdx and elsevier_pdf_cdx.mimetype == "application/pdf": - print(" Trying pdf.sciencedirectassets.com hack!", file=sys.stderr) + logging.warn("trying pdf.sciencedirectassets.com hack") cdx_row = elsevier_pdf_cdx else: - print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr) - # print(elsevier_pdf_cdx, file=sys.stderr) + logging.warn("failed pdf.sciencedirectassets.com hack") if not cdx_row: # lookup exact @@ -1282,7 +1273,7 @@ class SavePageNowClient: retry_sleep=self.spn_cdx_retry_sec, ) except KeyError as ke: - print(" CDX KeyError: {}".format(ke), file=sys.stderr) + logging.warn(f"cdx-api KeyError {ke}") return ResourceResult( start_url=start_url, hit=False, @@ -1368,10 +1359,7 @@ def fix_transfer_encoding( and resource.cdx and resource.cdx.mimetype != "application/gzip" ): - print( - " transfer encoding not stripped: {}".format(resource.cdx.mimetype), - file=sys.stderr, - ) + logging.warn(f"transfer encoding not stripped mimetype={resource.cdx.mimetype}") inner_body = gzip.decompress(resource.body) if not inner_body: raise Exception("null body inside transfer encoding") diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py index c79973f..6e2876f 100644 --- a/python/sandcrawler/ingest_file.py +++ b/python/sandcrawler/ingest_file.py @@ -1,4 +1,5 @@ import json +import logging import sys import time import xml.etree.ElementTree @@ -286,12 +287,13 @@ class IngestFileWorker(SandcrawlerWorker): ): via = "spn2" resource = self.spn_client.crawl_resource(url, self.wayback_client) - print( - "[FETCH {:>6}] {} {}".format( - via, (resource and resource.status), (resource and resource.terminal_url) or url - ), - file=sys.stderr, - ) + if resource: + print( + f"find_resource {via=} {url=} {resource.status=} {resource.terminal_url=}", + file=sys.stderr, + ) + else: + print(f"find_resource {via=} {url=} status=", file=sys.stderr) return resource def process_existing(self, request: dict, result_row: dict) -> dict: @@ -510,7 +512,7 @@ class IngestFileWorker(SandcrawlerWorker): try: if self.html_quick_mode: - print(" WARN: running quick CDX-only fetches", file=sys.stderr) + logging.warn("running quick CDX-only fetches") full_resources = quick_fetch_html_resources( raw_resources, self.wayback_client.cdx_client, when ) @@ -549,7 +551,7 @@ class IngestFileWorker(SandcrawlerWorker): return info def timeout_response(self, task: dict) -> dict: - print("[TIMEOUT]", file=sys.stderr) + print("ingest-timeout", file=sys.stderr) return dict( request=task, hit=False, @@ -563,7 +565,25 @@ class IngestFileWorker(SandcrawlerWorker): return True def process(self, request: dict, key: Any = None) -> dict: - return self.process_file(request, key=key) + start_time = time.time() + result = self.process_file(request, key=key) + result["duration"] = time.time() - start_time + print(self.canonical_log_line(result), file=sys.stderr) + return result + + def canonical_log_line(self, result: dict) -> str: + request = result["request"] + line = f"ingest-result status={result['status']} ingest_type={request['ingest_type']}" + if result["status"] == "success": + if result.get("file_meta"): + line += f" sha1={result['file_meta']['sha1hex']}" + if result.get("grobid"): + line += f" grobid_status_code={result['grobid'].get('status_code')}" + if result.get("pdf_meta"): + line += f" pdf_status={result['pdf_meta'].get('status')}" + if result.get("duration"): + line += f" duration={result['duration']:.3}" + return line def process_file(self, request: dict, key: Any = None) -> dict: @@ -584,10 +604,9 @@ class IngestFileWorker(SandcrawlerWorker): for block in self.base_url_blocklist: if block in base_url: - print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) return dict(request=request, hit=False, status="skip-url-blocklist") - print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + print(f"ingest-hop {ingest_type=} {base_url=}", file=sys.stderr) best_mimetype = None if ingest_type == "pdf": @@ -756,14 +775,8 @@ class IngestFileWorker(SandcrawlerWorker): next_url = fulltext_url.get("pdf_url") or fulltext_url.get("next_url") or "" assert next_url next_url = clean_url(next_url) - print( - "[PARSE {:>6}] {} {}".format( - ingest_type, - fulltext_url.get("technique"), - next_url, - ), - file=sys.stderr, - ) + technique = fulltext_url.get("technique") + print(f"parse-html {ingest_type=} {technique=} {next_url=}", file=sys.stderr) if next_url in hops: result["status"] = "link-loop" result["error_message"] = "repeated: {}".format(next_url) @@ -788,12 +801,7 @@ class IngestFileWorker(SandcrawlerWorker): next_url = next_url_found technique = "html_biblio" print( - "[PARSE {:>6}] {} {}".format( - ingest_type, - technique, - next_url, - ), - file=sys.stderr, + f"parse-html {ingest_type=} {technique=} {next_url=}", file=sys.stderr ) if next_url in hops: if ingest_type == "html": @@ -867,24 +875,6 @@ class IngestFileWorker(SandcrawlerWorker): result["status"] = "success" result["hit"] = True - if ingest_type == "pdf": - print( - "[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( - ingest_type, - result.get("file_meta", {}).get("sha1hex"), - result.get("grobid", {}).get("status_code"), - result.get("pdf_meta", {}).get("status"), - ), - file=sys.stderr, - ) - else: - print( - "[SUCCESS {:>5}] sha1:{}".format( - ingest_type, - result.get("file_meta", {}).get("sha1hex"), - ), - file=sys.stderr, - ) return result diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 3acbece..51a2d7c 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -263,6 +263,24 @@ class IngestFilesetWorker(IngestFileWorker): return result def process(self, request: dict, key: Any = None) -> dict: + start_time = time.time() + result = self.process_fileset(request, key=key) + result["duration"] = time.time() - start_time + print(self.canonical_log_line(result), file=sys.stderr) + return result + + def canonical_log_line(self, result: dict) -> str: + request = result["request"] + line = f"ingest-result status={result['status']} ingest_type={request['ingest_type']} ingest_strategy={result.get('ingest_strategy')}" + if result.get("file_count") is not None: + line += f" file_count={result['file_count']}" + if result.get("total_size") is not None: + line += f" total_size={result['total_size']}" + if result.get("duration"): + line += f" duration={result['duration']:.3}" + return line + + def process_fileset(self, request: dict, key: Any = None) -> dict: ingest_type = request.get("ingest_type") if ingest_type not in ("dataset",): @@ -275,7 +293,7 @@ class IngestFilesetWorker(IngestFileWorker): force_recrawl = bool(request.get("force_recrawl", False)) - print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + print(f"ingest-request {ingest_type=} {base_url=}", file=sys.stderr) # TODO: "existing" check against file and/or fileset ingest result table # existing = self.check_existing_ingest(ingest_type, base_url) @@ -366,8 +384,9 @@ class IngestFilesetWorker(IngestFileWorker): ingest_strategy = platform_helper.chose_strategy(dataset_meta) result["ingest_strategy"] = ingest_strategy + print( - f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", + f"platform {platform=} platform_id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} {ingest_strategy=}", file=sys.stderr, ) @@ -493,24 +512,4 @@ class IngestFilesetWorker(IngestFileWorker): if result["status"].startswith("success"): result["hit"] = True - print( - "[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( - ingest_type, - result["file_count"], - result["total_size"], - ingest_strategy, - ), - file=sys.stderr, - ) - else: - print( - "[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( - ingest_type, - result["status"], - result["file_count"], - result["total_size"], - ingest_strategy, - ), - file=sys.stderr, - ) return result diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py index 0ff7fe0..3989f9e 100644 --- a/python/sandcrawler/ingest_html.py +++ b/python/sandcrawler/ingest_html.py @@ -1,6 +1,7 @@ import argparse import datetime import json +import logging import sys import xml.etree.ElementTree as ET from typing import Any, List, Optional, Tuple @@ -156,9 +157,7 @@ def quick_fetch_html_resources( if not cdx_row: raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") if cdx_row.url != resource["url"] and not url_fuzzy_equal(cdx_row.url, resource["url"]): - print( - f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr - ) + logging.warn(f"CDX fuzzy match expected={resource['url']} found={cdx_row.url}") if not cdx_row.status_code: # TODO: fall back to a full fetch? print(" WARN: skipping revisit record", file=sys.stderr) -- cgit v1.2.3