aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ia.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
commit05bd7cbcc62588e431c5efd533189e246b2a997e (patch)
treeabcc707a451e77ea1e8c5ac9a5925b97a4bd139a /python/sandcrawler/ia.py
parentf3f424e42f2f4f383103cf80b30a00cfa6cfc179 (diff)
downloadsandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.tar.gz
sandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.zip
make fmt
Diffstat (limited to 'python/sandcrawler/ia.py')
-rw-r--r--python/sandcrawler/ia.py192
1 files changed, 124 insertions, 68 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index ca1182f..a8ce193 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -1,4 +1,3 @@
-
# XXX: some broken MRO thing going on in here due to python3 object wrangling
# in `wayback` library. Means we can't run pylint.
# pylint: skip-file
@@ -38,6 +37,7 @@ class SandcrawlerBackoffError(Exception):
"""
pass
+
ResourceResult = namedtuple("ResourceResult", [
"start_url",
"hit",
@@ -80,6 +80,7 @@ CdxPartial = namedtuple('CdxPartial', [
'sha1hex',
])
+
def cdx_partial_from_row(full):
return CdxPartial(
surt=full.surt,
@@ -91,6 +92,7 @@ def cdx_partial_from_row(full):
sha1hex=full.sha1hex,
)
+
def cdx_to_dict(cdx):
d = {
"surt": cdx.surt,
@@ -107,6 +109,7 @@ def cdx_to_dict(cdx):
d['warc_path'] = cdx.warc_path
return d
+
def fuzzy_match_url(left, right):
"""
Matches URLs agnostic of http/https (and maybe other normalizations in the
@@ -123,6 +126,7 @@ def fuzzy_match_url(left, right):
return True
return False
+
def test_fuzzy_match_url():
assert fuzzy_match_url("http://thing.com", "http://thing.com") == True
assert fuzzy_match_url("http://thing.com", "https://thing.com") == True
@@ -137,18 +141,19 @@ def test_fuzzy_match_url():
assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False
assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False
+
class CdxApiError(Exception):
pass
-class CdxApiClient:
+class CdxApiClient:
def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
- cdx_auth_token = kwargs.get('cdx_auth_token',
- os.environ.get('CDX_AUTH_TOKEN'))
+ cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN'))
if not cdx_auth_token:
- raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
+ raise Exception(
+ "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
self.http_session.headers.update({
'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient',
'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
@@ -208,7 +213,8 @@ class CdxApiClient:
found, because we expect to be looking up a specific full record.
"""
if len(datetime) != 14:
- raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
+ raise ValueError(
+ "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
params = {
'url': url,
'from': datetime,
@@ -226,18 +232,28 @@ class CdxApiClient:
if retry_sleep > 3:
next_sleep = retry_sleep - 3
retry_sleep = 3
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr)
time.sleep(retry_sleep)
- return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep)
+ return self.fetch(url,
+ datetime,
+ filter_status_code=filter_status_code,
+ retry_sleep=next_sleep)
raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
row = resp[0]
# allow fuzzy http/https match
if not (fuzzy_match_url(row.url, url) and row.datetime == datetime):
if retry_sleep and retry_sleep > 0:
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr)
time.sleep(retry_sleep)
- return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None)
- raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row))
+ return self.fetch(url,
+ datetime,
+ filter_status_code=filter_status_code,
+ retry_sleep=None)
+ raise KeyError(
+ "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(
+ url, datetime, row))
if filter_status_code:
assert row.status_code == filter_status_code
return row
@@ -311,17 +327,20 @@ class CdxApiClient:
class WaybackError(Exception):
pass
+
class WaybackContentError(Exception):
pass
+
class PetaboxError(Exception):
pass
+
class NoCaptureError(Exception):
pass
-class WaybackClient:
+class WaybackClient:
def __init__(self, cdx_client=None, **kwargs):
if cdx_client:
self.cdx_client = cdx_client
@@ -367,32 +386,42 @@ class WaybackClient:
if not self.petabox_webdata_secret:
raise Exception("WaybackClient needs petabox secret to do direct WARC fetches")
if not "/" in warc_path:
- raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
+ raise ValueError(
+ "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3(
- webdata_secret=self.petabox_webdata_secret,
- ))
+ self.rstore = ResourceStore(
+ loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, ))
try:
#print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
gwb_record = self.rstore.load_resource(warc_uri, offset, csize)
except wayback.exception.ResourceUnavailable:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
- raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ResourceUnavailable)")
except wayback.exception.InvalidResource:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
- raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)")
+ raise WaybackContentError(
+ "failed to load file contents from wayback/petabox (InvalidResource)")
except urllib3.exceptions.ReadTimeoutError as rte:
- raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ReadTimeoutError: {})".
+ format(rte))
except ValueError as ve:
- raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
except EOFError as eofe:
- raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
except TypeError as te:
- raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)"
+ .format(te))
except Exception as e:
if "while decompressing data: invalid block type" in str(e):
- raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files")
+ raise PetaboxError(
+ "decompression error fetching WARC record; usually due to bad alexa ARC files"
+ )
else:
raise e
# Note: could consider a generic "except Exception" here, as we get so
@@ -405,7 +434,8 @@ class WaybackClient:
raise WaybackContentError("too many HTTP headers (in wayback fetch)")
location = gwb_record.get_location() or None
- if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit():
+ if status_code is None and gwb_record.target_uri.startswith(
+ b"ftp://") and not gwb_record.is_revisit():
# TODO: some additional verification here?
status_code = 226
@@ -416,8 +446,9 @@ class WaybackClient:
raise WaybackContentError("found revisit record, but won't resolve (loop?)")
revisit_uri, revisit_dt = gwb_record.refers_to
if not (revisit_uri and revisit_dt):
- raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format(
- warc_path, offset))
+ raise WaybackContentError(
+ "revisit record missing URI and/or DT: warc:{} offset:{}".format(
+ warc_path, offset))
# convert revisit_dt
# len("2018-07-24T11:56:49"), or with "Z"
assert len(revisit_dt) in (19, 20)
@@ -425,7 +456,9 @@ class WaybackClient:
revisit_uri = revisit_uri.decode('utf-8')
if type(revisit_dt) is bytes:
revisit_dt = revisit_dt.decode('utf-8')
- revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '')
+ revisit_dt = revisit_dt.replace('-', '').replace(':',
+ '').replace('T',
+ '').replace('Z', '')
assert len(revisit_dt) == 14
try:
revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt)
@@ -443,10 +476,10 @@ class WaybackClient:
body = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
raise WaybackError(
- "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})"
+ .format(ire))
elif status_code is None:
- raise WaybackContentError(
- "got a None status_code in (W)ARC record")
+ raise WaybackContentError("got a None status_code in (W)ARC record")
return WarcResource(
status_code=status_code,
location=location,
@@ -454,7 +487,12 @@ class WaybackClient:
revisit_cdx=revisit_cdx,
)
- def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None):
+ def fetch_petabox_body(self,
+ csize,
+ offset,
+ warc_path,
+ resolve_revisit=True,
+ expected_status_code=None):
"""
Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
@@ -474,12 +512,10 @@ class WaybackClient:
raise KeyError("archived HTTP response (WARC) was not {}: {}".format(
expected_status_code,
resource.status_code,
- )
- )
+ ))
elif resource.status_code not in (200, 226):
raise KeyError("archived HTTP response (WARC) was not 200: {}".format(
- resource.status_code)
- )
+ resource.status_code))
return resource.body
@@ -514,7 +550,9 @@ class WaybackClient:
except requests.exceptions.ChunkedEncodingError:
raise WaybackError("ChunkedEncodingError (wayback replay fetch)")
except UnicodeDecodeError:
- raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ raise WaybackContentError(
+ "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
+ url))
try:
resp.raise_for_status()
@@ -526,21 +564,20 @@ class WaybackClient:
if not "X-Archive-Src" in resp.headers:
raise WaybackError("replay fetch didn't return X-Archive-Src in headers")
if not datetime in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url))
if cdx_sha1hex:
# verify that body matches CDX hash
# TODO: don't need *all* these hashes, just sha1
file_meta = gen_file_metadata(resp.content)
if cdx_sha1hex != file_meta['sha1hex']:
- print(" REPLAY MISMATCH: cdx:{} replay:{}".format(
- cdx_sha1hex,
- file_meta['sha1hex']),
- file=sys.stderr)
- raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format(
- cdx_sha1hex,
- file_meta['sha1hex']),
- )
+ print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex,
+ file_meta['sha1hex']),
+ file=sys.stderr)
+ raise WaybackContentError(
+ "replay fetch body didn't match CDX hash cdx:{} body:{}".format(
+ cdx_sha1hex, file_meta['sha1hex']), )
return resp.content
def fetch_replay_redirect(self, url, datetime):
@@ -568,7 +605,9 @@ class WaybackClient:
except requests.exceptions.TooManyRedirects:
raise WaybackContentError("redirect loop (wayback replay fetch)")
except UnicodeDecodeError:
- raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ raise WaybackContentError(
+ "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
+ url))
try:
resp.raise_for_status()
except Exception as e:
@@ -580,7 +619,8 @@ class WaybackClient:
if not "X-Archive-Src" in resp.headers:
raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers")
if not datetime in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url))
redirect_url = resp.headers.get("Location")
# eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw
@@ -622,7 +662,9 @@ class WaybackClient:
urls_seen = [start_url]
for i in range(self.max_redirects):
print(" URL: {}".format(next_url), file=sys.stderr)
- cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest)
+ cdx_row = self.cdx_client.lookup_best(next_url,
+ best_mimetype=best_mimetype,
+ closest=closest)
#print(cdx_row, file=sys.stderr)
if not cdx_row:
return ResourceResult(
@@ -776,9 +818,11 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
+
class SavePageNowBackoffError(SandcrawlerBackoffError):
pass
+
SavePageNowResult = namedtuple('SavePageNowResult', [
'success',
'status',
@@ -789,13 +833,11 @@ SavePageNowResult = namedtuple('SavePageNowResult', [
'resources',
])
-class SavePageNowClient:
+class SavePageNowClient:
def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs):
- self.ia_access_key = kwargs.get('ia_access_key',
- os.environ.get('IA_ACCESS_KEY'))
- self.ia_secret_key = kwargs.get('ia_secret_key',
- os.environ.get('IA_SECRET_KEY'))
+ self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY'))
+ self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY'))
self.v2endpoint = v2endpoint
self.v2_session = requests_retry_session(retries=5, backoff_factor=3)
self.v2_session.headers.update({
@@ -886,12 +928,15 @@ class SavePageNowClient:
},
)
if resp.status_code == 429:
- raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowBackoffError("status_code: {}, url: {}".format(
+ resp.status_code, request_url))
elif resp.status_code != 200:
- raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowError("SPN2 status_code: {}, url: {}".format(
+ resp.status_code, request_url))
resp_json = resp.json()
- if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[
+ 'message']:
raise SavePageNowBackoffError(resp_json['message'])
elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']:
raise SavePageNowError(
@@ -915,7 +960,8 @@ class SavePageNowClient:
final_json = resp.json()
break
else:
- raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url))
+ raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(
+ status, request_url))
if not final_json:
raise SavePageNowError("SPN2 timed out (polling count exceeded)")
@@ -923,8 +969,10 @@ class SavePageNowClient:
# if there was a recent crawl of same URL, fetch the status of that
# crawl to get correct datetime
if final_json.get('original_job_id'):
- print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr)
- resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id']))
+ print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}",
+ file=sys.stderr)
+ resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint,
+ final_json['original_job_id']))
try:
resp.raise_for_status()
except:
@@ -935,7 +983,8 @@ class SavePageNowClient:
if final_json['status'] == "success":
if final_json.get('original_url').startswith('/'):
- print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr)
+ print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}",
+ file=sys.stderr)
return SavePageNowResult(
True,
"success",
@@ -969,15 +1018,17 @@ class SavePageNowClient:
# HACK: capture CNKI domains with outlinks (for COVID-19 crawling)
if 'gzbd.cnki.net/' in start_url:
- spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1)
+ spn_result = self.save_url_now_v2(start_url,
+ force_simple_get=force_simple_get,
+ capture_outlinks=1)
else:
spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get)
if not spn_result.success:
status = spn_result.status
if status in ("error:invalid-url", "error:not-found",
- "error:invalid-host-resolution", "error:gateway-timeout",
- "error:too-many-redirects", "error:read-timeout"):
+ "error:invalid-host-resolution", "error:gateway-timeout",
+ "error:too-many-redirects", "error:read-timeout"):
status = status.replace("error:", "")
elif status in ("error:no-access", "error:forbidden"):
status = "forbidden"
@@ -988,7 +1039,8 @@ class SavePageNowClient:
elif status.startswith("error:"):
status = "spn2-" + status
# despite other errors, call these a failure (so we don't retry)
- if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")):
+ if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent')
+ or spn_result.terminal_url.endswith("cookieSet=1")):
status = "blocked-cookie"
return ResourceResult(
start_url=start_url,
@@ -1018,7 +1070,8 @@ class SavePageNowClient:
)
# don't try to CDX fetch for this common cookie block terminal
- if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
+ if spn_result.terminal_url.endswith(
+ '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
return ResourceResult(
start_url=start_url,
hit=False,
@@ -1143,9 +1196,12 @@ class SavePageNowClient:
)
-def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]:
- if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
- print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr)
+def fix_transfer_encoding(file_meta: dict,
+ resource: ResourceResult) -> Tuple[dict, ResourceResult]:
+ if resource.body and file_meta[
+ 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
+ print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype),
+ file=sys.stderr)
inner_body = gzip.decompress(resource.body)
if not inner_body:
raise Exception("null body inside transfer encoding")