diff options
Diffstat (limited to 'python/sandcrawler/ia.py')
-rw-r--r-- | python/sandcrawler/ia.py | 1138 |
1 files changed, 1138 insertions, 0 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py new file mode 100644 index 0000000..c586972 --- /dev/null +++ b/python/sandcrawler/ia.py @@ -0,0 +1,1138 @@ + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import os +import sys +import time +import gzip +import json +import requests +import datetime +import urllib.parse +import urllib3.exceptions +from typing import Tuple +from collections import namedtuple + +import http.client + +# not sure this will really work. Should go before wayback imports. +http.client._MAXHEADERS = 1000 # type: ignore + +import wayback.exception +from http.client import IncompleteRead +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory3 + +from .misc import b32_hex, requests_retry_session, gen_file_metadata, clean_url + +class SandcrawlerBackoffError(Exception): + """ + A set of Exceptions which are raised through multiple abstraction layers to + indicate backpressure. For example, SPNv2 back-pressure sometimes needs to + be passed up through any timeout/retry code and become an actual long pause + or crash. + """ + pass + +ResourceResult = namedtuple("ResourceResult", [ + "start_url", + "hit", + "status", + "terminal_url", + "terminal_dt", + "terminal_status_code", + "body", + "cdx", + "revisit_cdx", +]) + +WarcResource = namedtuple("WarcResource", [ + "status_code", + "location", + "body", + "revisit_cdx", +]) + +CdxRow = namedtuple('CdxRow', [ + 'surt', + 'datetime', + 'url', + 'mimetype', + 'status_code', + 'sha1b32', + 'sha1hex', + 'warc_csize', + 'warc_offset', + 'warc_path', +]) + +CdxPartial = namedtuple('CdxPartial', [ + 'surt', + 'datetime', + 'url', + 'mimetype', + 'status_code', + 'sha1b32', + 'sha1hex', +]) + +def cdx_partial_from_row(full): + return CdxPartial( + surt=full.surt, + datetime=full.datetime, + url=full.url, + mimetype=full.mimetype, + status_code=full.status_code, + sha1b32=full.sha1b32, + sha1hex=full.sha1hex, + ) + +def cdx_to_dict(cdx): + d = { + "surt": cdx.surt, + "datetime": cdx.datetime, + "url": cdx.url, + "mimetype": cdx.mimetype, + "status_code": cdx.status_code, + "sha1b32": cdx.sha1b32, + "sha1hex": cdx.sha1hex, + } + if type(cdx) == CdxRow and '/' in cdx.warc_path: + d['warc_csize'] = cdx.warc_csize + d['warc_offset'] = cdx.warc_offset + d['warc_path'] = cdx.warc_path + return d + +def fuzzy_match_url(left, right): + """ + Matches URLs agnostic of http/https (and maybe other normalizations in the + future) + """ + if left == right: + return True + if '://' in left and '://' in right: + left = '://'.join(left.split('://')[1:]) + right = '://'.join(right.split('://')[1:]) + if left == right: + return True + if left == right + "/" or right == left + "/": + return True + return False + +def test_fuzzy_match_url(): + assert fuzzy_match_url("http://thing.com", "http://thing.com") == True + assert fuzzy_match_url("http://thing.com", "https://thing.com") == True + assert fuzzy_match_url("http://thing.com", "ftp://thing.com") == True + assert fuzzy_match_url("http://thing.com", "http://thing.com/") == True + assert fuzzy_match_url("https://thing.com", "http://thing.com/") == True + assert fuzzy_match_url("https://thing.com/", "http://thing.com") == True + assert fuzzy_match_url("http://thing.com", "http://thing.com/blue") == False + + # should probably handle these? + assert fuzzy_match_url("http://thing.com", "http://www.thing.com") == False + assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False + assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False + +class CdxApiError(Exception): + pass + +class CdxApiClient: + + def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs): + self.host_url = host_url + self.http_session = requests_retry_session(retries=3, backoff_factor=3) + cdx_auth_token = kwargs.get('cdx_auth_token', + os.environ.get('CDX_AUTH_TOKEN')) + if not cdx_auth_token: + raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") + self.http_session.headers.update({ + 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient', + 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), + }) + + def _query_api(self, params): + """ + Hits CDX API with a query, parses result into a list of CdxRow + """ + resp = self.http_session.get(self.host_url, params=params) + if resp.status_code != 200: + raise CdxApiError(resp.text) + #print(resp.url, file=sys.stderr) + if not resp.text: + return None + rj = resp.json() + if len(rj) <= 1: + return None + rows = [] + for raw in rj[1:]: + # check number of CDX fields; there is a bug with some rows having + # spaces in WARC filename resulting in extra bogus fields + if len(raw) != 11: + raise CdxApiError(f"CDX response had {len(raw)} fields, not 11 expected") + + # transform "-" ftp status code to a 226 + status_code = None + if raw[4] == "-": + if raw[3] != "warc/revisit" and raw[2].startswith("ftp://"): + status_code = 226 + else: + status_code = int(raw[4]) + + # CDX rows with no WARC records? + if raw[8] == '-' or raw[9] == '-' or raw[10] == '-': + continue + + row = CdxRow( + surt=raw[0], + datetime=raw[1], + url=raw[2], + mimetype=raw[3], + status_code=status_code, + sha1b32=raw[5], + sha1hex=b32_hex(raw[5]), + warc_csize=int(raw[8]), + warc_offset=int(raw[9]), + warc_path=raw[10], + ) + assert (row.mimetype == "-") or ("-" not in row) + rows.append(row) + return rows + + def fetch(self, url, datetime, filter_status_code=None, retry_sleep=None): + """ + Fetches a single CDX row by url/datetime. Raises a KeyError if not + found, because we expect to be looking up a specific full record. + """ + if len(datetime) != 14: + raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) + params = { + 'url': url, + 'from': datetime, + 'to': datetime, + 'matchType': 'exact', + 'limit': 1, + 'output': 'json', + } + if filter_status_code: + params['filter'] = "statuscode:{}".format(filter_status_code) + resp = self._query_api(params) + if not resp: + if retry_sleep and retry_sleep > 0: + next_sleep = None + if retry_sleep > 3: + next_sleep = retry_sleep - 3 + retry_sleep = 3 + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + time.sleep(retry_sleep) + return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep) + raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) + row = resp[0] + # allow fuzzy http/https match + if not (fuzzy_match_url(row.url, url) and row.datetime == datetime): + if retry_sleep and retry_sleep > 0: + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + time.sleep(retry_sleep) + return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None) + raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row)) + if filter_status_code: + assert row.status_code == filter_status_code + return row + + def lookup_best(self, url, max_age_days=None, best_mimetype=None, closest=None): + """ + Fetches multiple CDX rows for the given URL, tries to find the most recent. + + If no matching row is found, return None. Note this is different from fetch. + + Preference order by status code looks like: + + 200 or 226 + mimetype match + not-liveweb + most-recent + no match + not-liveweb + most-recent + 3xx + most-recent + 4xx + most-recent + 5xx + most-recent + + """ + params = { + 'url': url, + 'matchType': 'exact', + 'limit': -25, + 'output': 'json', + # Collapsing seems efficient, but is complex; would need to include + # other filters and status code in filter + #'collapse': 'timestamp:6', + + # Revisits now allowed and resolved! + #'filter': '!mimetype:warc/revisit', + } + if max_age_days: + since = datetime.date.today() - datetime.timedelta(days=max_age_days) + params['from'] = '%04d%02d%02d' % (since.year, since.month, since.day), + if closest: + params['closest'] = closest + params['sort'] = "closest" + #print(params, file=sys.stderr) + rows = self._query_api(params) + if not rows: + return None + + def _cdx_sort_key(r): + """ + This is a function, not a lambda, because it captures + best_mimetype. Will create a tuple that can be used to sort in + *reverse* order. + """ + return ( + int(r.status_code in (200, 226)), + int(0 - (r.status_code or 999)), + int(r.mimetype == best_mimetype), + int(r.mimetype != "warc/revisit"), + int(r.datetime[:6]), + int('/' in r.warc_path), + int(r.datetime), + ) + + rows = sorted(rows, key=_cdx_sort_key) + return rows[-1] + + +class WaybackError(Exception): + pass + +class WaybackContentError(Exception): + pass + +class PetaboxError(Exception): + pass + +class NoCaptureError(Exception): + pass + +class WaybackClient: + + def __init__(self, cdx_client=None, **kwargs): + if cdx_client: + self.cdx_client = cdx_client + else: + self.cdx_client = CdxApiClient() + # /serve/ instead of /download/ doesn't record view count + # this *does* want to be http://, not https:// + self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + # gwb library will fall back to reading from /opt/.petabox/webdata.secret + self.petabox_webdata_secret = kwargs.get( + 'petabox_webdata_secret', + os.environ.get('PETABOX_WEBDATA_SECRET'), + ) + self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/') + self.rstore = None + self.max_redirects = 25 + self.wayback_endpoint = "https://web.archive.org/web/" + self.replay_headers = { + 'User-Agent': 'Mozilla/5.0 sandcrawler.WaybackClient', + } + + def fetch_petabox(self, csize, offset, warc_path, resolve_revisit=True): + """ + Fetches wayback resource directly from petabox using WARC path/offset/csize. + + If there is a problem with petabox, raises a PetaboxError. + If resource doesn't exist, would raise a KeyError (TODO). + + The body is only returned if the record is success (HTTP 200 or + equivalent). Otherwise only the status and header info is returned. + + WarcResource object (namedtuple) contains fields: + - status_code: int + - location: eg, for redirects + - body: raw bytes + + resolve_revist does what it sounds like: tries following a revisit + record by looking up CDX API and then another fetch. Refuses to recurse + more than one hop (eg, won't follow a chain of revisits). + + Requires (and uses) a secret token. + """ + if not self.petabox_webdata_secret: + raise Exception("WaybackClient needs petabox secret to do direct WARC fetches") + if not "/" in warc_path: + raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) + warc_uri = self.warc_uri_prefix + warc_path + if not self.rstore: + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3( + webdata_secret=self.petabox_webdata_secret, + )) + try: + #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) + gwb_record = self.rstore.load_resource(warc_uri, offset, csize) + except wayback.exception.ResourceUnavailable: + print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") + except wayback.exception.InvalidResource: + print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)") + except urllib3.exceptions.ReadTimeoutError as rte: + raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte)) + except ValueError as ve: + raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + except EOFError as eofe: + raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + except TypeError as te: + raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + except Exception as e: + if "while decompressing data: invalid block type" in str(e): + raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files") + else: + raise e + # Note: could consider a generic "except Exception" here, as we get so + # many petabox errors. Do want jobs to fail loud and clear when the + # whole cluster is down though. + + try: + status_code = gwb_record.get_status()[0] + except http.client.HTTPException: + raise WaybackContentError("too many HTTP headers (in wayback fetch)") + location = gwb_record.get_location() or None + + if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit(): + # TODO: some additional verification here? + status_code = 226 + + body = None + revisit_cdx = None + if gwb_record.is_revisit(): + if not resolve_revisit: + raise WaybackContentError("found revisit record, but won't resolve (loop?)") + revisit_uri, revisit_dt = gwb_record.refers_to + if not (revisit_uri and revisit_dt): + raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format( + warc_path, offset)) + # convert revisit_dt + # len("2018-07-24T11:56:49"), or with "Z" + assert len(revisit_dt) in (19, 20) + if type(revisit_uri) is bytes: + revisit_uri = revisit_uri.decode('utf-8') + if type(revisit_dt) is bytes: + revisit_dt = revisit_dt.decode('utf-8') + revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '') + assert len(revisit_dt) == 14 + try: + revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt) + body = self.fetch_petabox_body( + csize=revisit_cdx.warc_csize, + offset=revisit_cdx.warc_offset, + warc_path=revisit_cdx.warc_path, + resolve_revisit=False, + expected_status_code=revisit_cdx.status_code, + ) + except KeyError as ke: + raise WaybackError("Revist resolution failed: {}".format(ke)) + elif status_code in (200, 226): + try: + body = gwb_record.open_raw_content().read() + except IncompleteRead as ire: + raise WaybackError( + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + elif status_code is None: + raise WaybackContentError( + "got a None status_code in (W)ARC record") + return WarcResource( + status_code=status_code, + location=location, + body=body, + revisit_cdx=revisit_cdx, + ) + + def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None): + """ + Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. + + Returns bytes. Raises KeyError if resource wasn't an HTTP 200. + + Thin helper around fetch_petabox() + """ + resource = self.fetch_petabox( + csize=csize, + offset=offset, + warc_path=warc_path, + resolve_revisit=resolve_revisit, + ) + + if expected_status_code: + if expected_status_code != resource.status_code: + raise KeyError("archived HTTP response (WARC) was not {}: {}".format( + expected_status_code, + resource.status_code, + ) + ) + elif resource.status_code not in (200, 226): + raise KeyError("archived HTTP response (WARC) was not 200: {}".format( + resource.status_code) + ) + + return resource.body + + def fetch_replay_body(self, url, datetime, cdx_sha1hex=None): + """ + Fetches an HTTP 200 record from wayback via the replay interface + (web.archive.org) instead of petabox. + + Intended for use with SPN2 requests, where request body has not ended + up in petabox yet. + + If cdx_sha1hex is passed, will try to verify fetched body. Note that + this check *won't work* in many cases, due to CDX hash being of + compressed transfer data, not the uncompressed final content bytes. + + TODO: could instead try to verify that we got the expected replay body + using... new X-Archive headers? + """ + + # defensively check datetime format + assert len(datetime) == 14 + assert datetime.isdigit() + + try: + resp = requests.get( + self.wayback_endpoint + datetime + "id_/" + url, + allow_redirects=False, + headers=self.replay_headers, + ) + except requests.exceptions.TooManyRedirects: + raise WaybackContentError("redirect loop (wayback replay fetch)") + except requests.exceptions.ChunkedEncodingError: + raise WaybackError("ChunkedEncodingError (wayback replay fetch)") + except UnicodeDecodeError: + raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + + try: + resp.raise_for_status() + except Exception as e: + raise WaybackError(str(e)) + #print(resp.url, file=sys.stderr) + + # defensively check that this is actually correct replay based on headers + if not "X-Archive-Src" in resp.headers: + raise WaybackError("replay fetch didn't return X-Archive-Src in headers") + if not datetime in resp.url: + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + + if cdx_sha1hex: + # verify that body matches CDX hash + # TODO: don't need *all* these hashes, just sha1 + file_meta = gen_file_metadata(resp.content) + if cdx_sha1hex != file_meta['sha1hex']: + print(" REPLAY MISMATCH: cdx:{} replay:{}".format( + cdx_sha1hex, + file_meta['sha1hex']), + file=sys.stderr) + raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format( + cdx_sha1hex, + file_meta['sha1hex']), + ) + return resp.content + + def fetch_replay_redirect(self, url, datetime): + """ + Fetches an HTTP 3xx redirect Location from wayback via the replay interface + (web.archive.org) instead of petabox. + + Intended for use with SPN2 requests, where request body has not ended + up in petabox yet. For example, re-ingesting a base_url which was + recently crawler by SPNv2, where we are doing ingest via wayback path. + + Returns None if response is found, but couldn't find redirect. + """ + + # defensively check datetime format + assert len(datetime) == 14 + assert datetime.isdigit() + + try: + resp = requests.get( + self.wayback_endpoint + datetime + "id_/" + url, + allow_redirects=False, + headers=self.replay_headers, + ) + except requests.exceptions.TooManyRedirects: + raise WaybackContentError("redirect loop (wayback replay fetch)") + except UnicodeDecodeError: + raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + try: + resp.raise_for_status() + except Exception as e: + raise WaybackError(str(e)) + #print(resp.url, file=sys.stderr) + + # defensively check that this is actually correct replay based on headers + # previously check for "X-Archive-Redirect-Reason" here + if not "X-Archive-Src" in resp.headers: + raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers") + if not datetime in resp.url: + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + + redirect_url = resp.headers.get("Location") + # eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw + #print(redirect_url, file=sys.stderr) + if redirect_url and redirect_url.startswith("https://web.archive.org/web/"): + redirect_url = "/".join(redirect_url.split("/")[5:]) + #print(redirect_url, file=sys.stderr) + if redirect_url and redirect_url.startswith("http"): + redirect_url = clean_url(redirect_url) + return redirect_url + else: + return None + + def lookup_resource(self, start_url, best_mimetype=None, closest=None): + """ + Looks in wayback for a resource starting at the URL, following any + redirects. Returns a ResourceResult object, which may indicate a + failure to fetch the resource. + + Only raises exceptions on remote service failure or unexpected + problems. + + In a for loop: + + lookup "best" CDX + redirect status code? + fetch wayback + continue + success (200)? + fetch wayback + return success + bad (other status)? + return failure + + got to end? + return failure; too many redirects + """ + next_url = start_url + urls_seen = [start_url] + for i in range(self.max_redirects): + print(" URL: {}".format(next_url), file=sys.stderr) + cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest) + #print(cdx_row, file=sys.stderr) + if not cdx_row: + return ResourceResult( + start_url=start_url, + hit=False, + status="no-capture", + terminal_url=next_url, + terminal_dt=None, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + # first try straight-forward redirect situation + if cdx_row.mimetype == "warc/revisit" and '/' in cdx_row.warc_path: + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + if resource.revisit_cdx and resource.revisit_cdx.status_code in (200, 226): + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=resource.revisit_cdx.status_code, + body=resource.body, + cdx=cdx_row, + revisit_cdx=resource.revisit_cdx, + ) + # else, continue processing with revisit record + + if cdx_row.status_code in (200, 226): + revisit_cdx = None + if '/' in cdx_row.warc_path: + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + body = resource.body + revisit_cdx = resource.revisit_cdx + else: + body = self.fetch_replay_body( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + cdx_row = cdx_partial_from_row(cdx_row) + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_row, + revisit_cdx=revisit_cdx, + ) + elif 300 <= (cdx_row.status_code or 0) < 400: + if '/' in cdx_row.warc_path: + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + resolve_revisit=False, + ) + assert 300 <= resource.status_code < 400 + if not resource.location: + print(" bad redirect record: {}".format(cdx_row), file=sys.stderr) + return ResourceResult( + start_url=start_url, + hit=False, + status="bad-redirect", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + if not "://" in resource.location: + next_url = urllib.parse.urljoin(next_url, resource.location) + else: + next_url = resource.location + if next_url: + next_url = clean_url(next_url) + else: + next_url = self.fetch_replay_redirect( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + if next_url: + next_url = clean_url(next_url) + cdx_row = cdx_partial_from_row(cdx_row) + if not next_url: + print(" bad redirect record: {}".format(cdx_row), file=sys.stderr) + return ResourceResult( + start_url=start_url, + hit=False, + status="bad-redirect", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + if next_url in urls_seen: + return ResourceResult( + start_url=start_url, + hit=False, + status="redirect-loop", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + urls_seen.append(next_url) + continue + else: + return ResourceResult( + start_url=start_url, + hit=False, + status="terminal-bad-status", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + return ResourceResult( + start_url=start_url, + hit=False, + status="redirects-exceeded", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + + +class SavePageNowError(Exception): + pass + +class SavePageNowBackoffError(SandcrawlerBackoffError): + pass + +SavePageNowResult = namedtuple('SavePageNowResult', [ + 'success', + 'status', + 'job_id', + 'request_url', + 'terminal_url', + 'terminal_dt', + 'resources', +]) + +class SavePageNowClient: + + def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs): + self.ia_access_key = kwargs.get('ia_access_key', + os.environ.get('IA_ACCESS_KEY')) + self.ia_secret_key = kwargs.get('ia_secret_key', + os.environ.get('IA_SECRET_KEY')) + self.v2endpoint = v2endpoint + self.v2_session = requests_retry_session(retries=5, backoff_factor=3) + self.v2_session.headers.update({ + 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient', + 'Accept': 'application/json', + 'Authorization': 'LOW {}:{}'.format(self.ia_access_key, self.ia_secret_key), + }) + + # 3 minutes total + self.poll_count = 60 + self.poll_seconds = 3.0 + + self.spn_cdx_retry_sec = kwargs.get('spn_cdx_retry_sec', 9.0) + + def save_url_now_v2(self, request_url, force_simple_get=0, capture_outlinks=0): + """ + Returns a "SavePageNowResult" (namedtuple) if SPN request was processed + at all, or raises an exception if there was an error with SPN itself. + + If SPN2 was unable to fetch the remote content, `success` will be + false and status will be indicated. + + SavePageNowResult fields: + - success: boolean if SPN + - status: "success" or an error message/type + - job_id: returned by API + - request_url: url we asked to fetch + - terminal_url: final primary resource (after any redirects) + - terminal_dt: wayback timestamp of final capture + - resources: list of all URLs captured + + TODO: parse SPN error codes (status string) and handle better. Eg, + non-200 remote statuses, invalid hosts/URLs, timeouts, backoff, etc. + """ + if capture_outlinks: + print(" capturing outlinks!", file=sys.stderr) + if not (self.ia_access_key and self.ia_secret_key): + raise Exception("SPN2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)") + if request_url.startswith("ftp://"): + return SavePageNowResult( + False, + "spn2-no-ftp", + None, + request_url, + None, + None, + None, + ) + resp = self.v2_session.post( + self.v2endpoint, + data={ + 'url': request_url, + 'capture_all': 1, + 'capture_outlinks': capture_outlinks, + 'capture_screenshot': 0, + 'if_not_archived_within': '1d', + 'force_get': force_simple_get, + 'skip_first_archive': 1, + 'outlinks_availability': 0, + 'js_behavior_timeout': 0, + }, + ) + if resp.status_code == 429: + raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) + elif resp.status_code != 200: + raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)) + resp_json = resp.json() + + if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: + raise SavePageNowBackoffError(resp_json['message']) + elif not resp_json or 'job_id' not in resp_json: + raise SavePageNowError( + "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)) + + job_id = resp_json['job_id'] + print(f" SPNv2 running: job_id={job_id} url={request_url}", file=sys.stderr) + + # poll until complete + final_json = None + for i in range(self.poll_count): + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, resp_json['job_id'])) + try: + resp.raise_for_status() + except: + raise SavePageNowError(resp.content) + status = resp.json()['status'] + if status == 'pending': + time.sleep(self.poll_seconds) + elif status in ('success', 'error'): + final_json = resp.json() + break + else: + raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url)) + + if not final_json: + raise SavePageNowError("SPN2 timed out (polling count exceeded)") + + # if there was a recent crawl of same URL, fetch the status of that + # crawl to get correct datetime + if final_json.get('original_job_id'): + print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr) + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id'])) + try: + resp.raise_for_status() + except: + raise SavePageNowError(resp.content) + final_json = resp.json() + + #print(final_json, file=sys.stderr) + + if final_json['status'] == "success": + if final_json.get('original_url').startswith('/'): + print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr) + return SavePageNowResult( + True, + "success", + job_id, + request_url, + final_json['original_url'], + final_json['timestamp'], + final_json['resources'], + ) + else: + if final_json['status'] == 'pending': + final_json['status'] = 'error:pending' + return SavePageNowResult( + False, + final_json.get('status_ext') or final_json['status'], + job_id, + request_url, + None, + None, + None, + ) + + def crawl_resource(self, start_url, wayback_client, force_simple_get=0): + """ + Runs a SPN2 crawl, then fetches body. + + There is a delay between SPN2 crawls and WARC upload to petabox, so we + need to fetch the body via wayback replay instead of petabox + range-request. + """ + + # HACK: capture CNKI domains with outlinks (for COVID-19 crawling) + if 'gzbd.cnki.net/' in start_url: + spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1) + else: + spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get) + + if not spn_result.success: + status = spn_result.status + if status in ("error:invalid-url", "error:not-found", + "error:invalid-host-resolution", "error:gateway-timeout", + "error:too-many-redirects", "error:read-timeout"): + status = status.replace("error:", "") + elif status in ("error:no-access", "error:forbidden"): + status = "forbidden" + elif status == "error:user-session-limit": + raise SavePageNowBackoffError("SPNv2 user-session-limit") + elif status == "error:internal-server-error": + status = "remote-server-error" + elif status.startswith("error:"): + status = "spn2-" + status + # despite other errors, call these a failure (so we don't retry) + if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")): + status = "blocked-cookie" + return ResourceResult( + start_url=start_url, + hit=False, + status=status, + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + #print(spn_result, file=sys.stderr) + + # detect partial URL response (aka, success, but missing full URL) + if not "://" in spn_result.terminal_url or spn_result.terminal_url.startswith('/'): + return ResourceResult( + start_url=start_url, + hit=False, + status="spn2-success-partial-url", + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + # don't try to CDX fetch for this common cookie block terminal + if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): + return ResourceResult( + start_url=start_url, + hit=False, + status="blocked-cookie", + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + cdx_row = None + # hack to work around elsevier weirdness + if "://pdf.sciencedirectassets.com/" in spn_result.request_url: + elsevier_pdf_cdx = wayback_client.cdx_client.lookup_best( + spn_result.request_url, + best_mimetype="application/pdf", + ) + if elsevier_pdf_cdx and elsevier_pdf_cdx.mimetype == "application/pdf": + print(" Trying pdf.sciencedirectassets.com hack!", file=sys.stderr) + cdx_row = elsevier_pdf_cdx + else: + print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr) + #print(elsevier_pdf_cdx, file=sys.stderr) + + if not cdx_row: + # lookup exact + try: + filter_status_code = None + if spn_result.terminal_url.startswith("ftp://"): + filter_status_code = 226 + cdx_row = wayback_client.cdx_client.fetch( + url=spn_result.terminal_url, + datetime=spn_result.terminal_dt, + filter_status_code=filter_status_code, + retry_sleep=self.spn_cdx_retry_sec, + ) + # sometimes there are fuzzy http/https self-redirects with the + # same SURT; try to work around that + if cdx_row.status_code >= 300 and cdx_row.status_code < 400: + cdx_row = wayback_client.cdx_client.fetch( + url=spn_result.terminal_url, + datetime=spn_result.terminal_dt, + filter_status_code=200, + retry_sleep=self.spn_cdx_retry_sec, + ) + except KeyError as ke: + print(" CDX KeyError: {}".format(ke), file=sys.stderr) + return ResourceResult( + start_url=start_url, + hit=False, + status="spn2-cdx-lookup-failure", + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + #print(cdx_row, file=sys.stderr) + + revisit_cdx = None + if '/' in cdx_row.warc_path: + # Usually can't do this kind of direct fetch because CDX result is recent/live + resource = wayback_client.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + body = resource.body + if resource.revisit_cdx: + assert resource.revisit_cdx.sha1hex == cdx_row.sha1hex + revisit_cdx = resource.revisit_cdx + else: + # note: currently not trying to verify cdx_row.sha1hex + try: + body = wayback_client.fetch_replay_body( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + except (WaybackError, WaybackContentError) as we: + return ResourceResult( + start_url=start_url, + hit=False, + status="spn2-wayback-error", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + # warc_path etc will change, so strip them out + cdx_row = cdx_partial_from_row(cdx_row) + + assert cdx_row.status_code + if cdx_row.status_code in (200, 226): + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_row, + revisit_cdx=revisit_cdx, + ) + else: + return ResourceResult( + start_url=start_url, + hit=False, + status="terminal-bad-status", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_row, + revisit_cdx=revisit_cdx, + ) + + +def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]: + if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': + print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) + inner_body = gzip.decompress(resource.body) + if not inner_body: + raise Exception("null body inside transfer encoding") + inner_resource = ResourceResult( + body=inner_body, + # copy all other fields + start_url=resource.start_url, + hit=resource.hit, + status=resource.status, + terminal_url=resource.terminal_url, + terminal_dt=resource.terminal_dt, + terminal_status_code=resource.terminal_status_code, + cdx=resource.cdx, + revisit_cdx=resource.revisit_cdx, + ) + inner_file_meta = gen_file_metadata(inner_resource.body) + return (inner_file_meta, inner_resource) + else: + return (file_meta, resource) |