From 38e635105a658850399847aa23a5bd5325b0d616 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 9 Jan 2020 15:37:42 -0800 Subject: lots of progress on wayback refactoring - too much to list - canonical flags to control crawling - cdx_to_dict helper --- python/sandcrawler/ia.py | 162 +++++++++++++++++++++++++++++---------- python/sandcrawler/ingest.py | 26 ++++--- python/tests/test_savepagenow.py | 8 +- 3 files changed, 145 insertions(+), 51 deletions(-) diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 6544e58..6468743 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -67,6 +67,21 @@ def cdx_partial_from_row(full): sha1hex=full.sha1hex, ) +def cdx_to_dict(cdx): + d = { + "surt": cdx.surt, + "datetime": cdx.datetime, + "url": cdx.url, + "mimetype": cdx.mimetype, + "status_code": cdx.status_code, + "sha1b32": cdx.sha1b32, + "sha1hex": cdx.sha1hex, + } + if type(cdx) == CdxRow and '/' in cdx.warc_path: + d['warc_csize'] = cdx.warc_csize + d['warc_offset'] = cdx.warc_offset + d['warc_path'] = cdx.warc_path + return d class CdxApiError(Exception): pass @@ -98,6 +113,7 @@ class CdxApiClient: rows = [] for raw in rj[1:]: assert len(raw) == 11 # JSON is short + #print(raw, file=sys.stderr) row = CdxRow( surt=raw[0], datetime=raw[1], @@ -136,7 +152,9 @@ class CdxApiClient: raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) row = resp[0] if not (row.url == url and row.datetime == datetime): - raise KeyError("CDX url/datetime not found: {} {} (closest: {})".format(url, datetime, row)) + raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row)) + if filter_status_code: + assert row.status_code == filter_status_code return row def lookup_best(self, url, max_age_days=None, best_mimetype=None): @@ -209,13 +227,16 @@ class WaybackClient: # this *does* want to be http://, not https:// self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') # gwb library will fall back to reading from /opt/.petabox/webdata.secret - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) + self.petabox_webdata_secret = kwargs.get( + 'petabox_webdata_secret', + os.environ.get('PETABOX_WEBDATA_SECRET'), + ) self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/') self.rstore = None self.max_redirects = 25 self.wayback_endpoint = "https://web.archive.org/web/" - def fetch_petabox(self, c_size, offset, warc_path): + def fetch_petabox(self, csize, offset, warc_path): """ Fetches wayback resource directly from petabox using WARC path/offset/csize. @@ -243,7 +264,8 @@ class WaybackClient: webdata_secret=self.petabox_webdata_secret, download_base_url=self.petabox_base_url)) try: - gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) + #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) + gwb_record = self.rstore.load_resource(warc_uri, offset, csize) except wayback.exception.ResourceUnavailable: raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") except ValueError as ve: @@ -264,14 +286,15 @@ class WaybackClient: try: body = gwb_record.open_raw_content().read() except IncompleteRead as ire: - raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + raise WaybackError( + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) return WarcResource( status_code=status_code, location=location, body=body, ) - def fetch_petabox_body(self, c_size, offset, warc_path): + def fetch_petabox_body(self, csize, offset, warc_path): """ Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. @@ -279,14 +302,20 @@ class WaybackClient: Thin helper around fetch_petabox() """ - resource = self.fetch_petabox(c_size, offset, warc_path) + resource = self.fetch_petabox( + csize=csize, + offset=offset, + warc_path=warc_path, + ) if resource.status_code != 200: - raise KeyError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0])) + raise KeyError("archived HTTP response (WARC) was not 200: {}".format( + gwb_record.get_status()[0]), + ) return resource.body - def fetch_replay_body(self, url, datetime): + def fetch_replay_body(self, url, datetime, cdx_sha1hex=None): """ Fetches an HTTP 200 record from wayback via the replay interface (web.archive.org) instead of petabox. @@ -294,9 +323,20 @@ class WaybackClient: Intended for use with SPN2 requests, where request body has not ended up in petabox yet. - TODO: is this really necessary? + If cdx_sha1hex is passed, will try to verify fetched body. Note that + this check *won't work* in many cases, due to CDX hash being of + compressed transfer data, not the uncompressed final content bytes. + + TODO: could instead try to verify that we got the expected replay body + using... new X-Archive headers? """ + + # defensively check datetime format + assert len(datetime) == 14 + assert datetime.isdigit() + try: + # TODO: don't follow redirects? resp = requests.get(self.wayback_endpoint + datetime + "id_/" + url) except requests.exceptions.TooManyRedirects: raise WaybackError("redirect loop (wayback replay fetch)") @@ -304,26 +344,46 @@ class WaybackClient: resp.raise_for_status() except Exception as e: raise WaybackError(str(e)) - # TODO: some verification here? #print(resp.url, file=sys.stderr) - #print(resp.content) + + # defensively check that this is actually correct replay based on headers + assert "X-Archive-Src" in resp.headers + assert datetime in resp.url + + if cdx_sha1hex: + # verify that body matches CDX hash + # TODO: don't need *all* these hashes, just sha1 + file_meta = gen_file_metadata(resp.content) + if cdx_sha1hex != file_meta['sha1hex']: + print("REPLAY MISMATCH: cdx:{} replay:{}".format( + cdx_sha1hex, + file_meta['sha1hex']), + file=sys.stderr) + raise WaybackError("replay fetch body didn't match CDX hash cdx:{} body:{}".format( + cdx_sha1hex, + file_meta['sha1hex']), + ) return resp.content def lookup_resource(self, start_url, best_mimetype=None): """ Looks in wayback for a resource starting at the URL, following any - redirects. Returns a ResourceResult object. + redirects. Returns a ResourceResult object, which may indicate a + failure to fetch the resource. + + Only raises exceptions on remote service failure or unexpected + problems. In a for loop: - lookup best CDX - redirect? + lookup "best" CDX + redirect status code? fetch wayback continue - good? + success (200)? fetch wayback return success - bad? + bad (other status)? return failure got to end? @@ -333,6 +393,7 @@ class WaybackClient: urls_seen = [start_url] for i in range(self.max_redirects): cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype) + #print(cdx_row, file=sys.stderr) if not cdx_row: return ResourceResult( start_url=start_url, @@ -345,7 +406,18 @@ class WaybackClient: cdx=None, ) if cdx_row.status_code == 200: - body = self.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path) + if '/' in cdx_row.warc_path: + body = self.fetch_petabox_body( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + else: + body = self.fetch_replay_body( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + cdx_row = cdx_partial_from_row(cdx_row) return ResourceResult( start_url=start_url, hit=True, @@ -357,8 +429,14 @@ class WaybackClient: cdx=cdx_row, ) elif 300 <= cdx_row.status_code < 400: - resource = self.fetch_petabox(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path) + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) assert 300 <= resource.status_code < 400 + assert resource.location + #print(resource, file=sys.stderr) next_url = resource.location if next_url in urls_seen: return ResourceResult( @@ -443,8 +521,8 @@ class SavePageNowClient: - terminal_dt: wayback timestamp of final capture - resources: list of all URLs captured - TODO: parse SPN error codes and handle better. Eg, non-200 remote - statuses, invalid hosts/URLs, timeouts, backoff, etc. + TODO: parse SPN error codes (status string) and handle better. Eg, + non-200 remote statuses, invalid hosts/URLs, timeouts, backoff, etc. """ if not (self.ia_access_key and self.ia_secret_key): raise Exception("SPN2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)") @@ -486,6 +564,8 @@ class SavePageNowClient: if not final_json: raise SavePageNowError("SPN2 timed out (polling count exceeded)") + # if there was a recent crawl of same URL, fetch the status of that + # crawl to get correct datetime if final_json.get('original_job_id'): resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id'])) try: @@ -538,26 +618,30 @@ class SavePageNowClient: cdx=None, ) - # sleep one second to let CDX API update (XXX) - time.sleep(1.0) - - # fetch CDX, then body + # fetch exact CDX row cdx_row = wayback_client.cdx_client.fetch( - spn_result.terminal_url, - spn_result.terminal_dt, + url=spn_result.terminal_url, + datetime=spn_result.terminal_dt, filter_status_code=200, ) - print(spn_result, file=sys.stderr) - print(cdx_row, file=sys.stderr) - assert cdx_row.status_code == 200 - body = wayback_client.fetch_replay_body(cdx_row.url, cdx_row.datetime) - file_meta = gen_file_metadata(body) - if cdx_row.sha1hex != file_meta['sha1hex']: - print(file_meta, file=sys.stderr) - raise WaybackError("replay fetch resulted in non-matching SHA1 (vs. CDX API)") - - # not a full CDX yet - cdx_partial = cdx_partial_from_row(cdx_row) + + if '/' in cdx_row.warc_path: + # Usually can't do this kind of direct fetch because CDX result is recent/live + resource = wayback_client.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + body = resource.body + else: + # note: currently not trying to verify cdx_row.sha1hex + body = wayback_client.fetch_replay_body( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + # warc_path etc will change, so strip them out + cdx_row = cdx_partial_from_row(cdx_row) + return ResourceResult( start_url=start_url, hit=True, @@ -566,6 +650,6 @@ class SavePageNowClient: terminal_dt=cdx_row.datetime, terminal_status_code=cdx_row.status_code, body=body, - cdx=cdx_partial, + cdx=cdx_row, ) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index f085fc5..4b6c587 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -6,7 +6,7 @@ import requests from http.server import BaseHTTPRequestHandler, HTTPServer from collections import namedtuple -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict from sandcrawler.grobid import GrobidClient from sandcrawler.misc import gen_file_metadata from sandcrawler.html import extract_fulltext_url @@ -54,7 +54,9 @@ class IngestFileWorker(SandcrawlerWorker): if not self.grobid_client: self.grobid_client = GrobidClient() - self.attempt_existing = False + self.try_existing_ingest = False + self.try_wayback = True + self.try_spn2 = True def check_existing_ingest(self, base_url): """ @@ -66,7 +68,7 @@ class IngestFileWorker(SandcrawlerWorker): Looks at existing ingest results and makes a decision based on, eg, status and timestamp. """ - if not self.attempt_existing: + if not self.try_existing_ingest: return None raise NotImplementedError @@ -78,10 +80,16 @@ class IngestFileWorker(SandcrawlerWorker): Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. """ - resource = self.wayback_client.lookup_resource(url, best_mimetype) - if not resource or not resource.hit: + via = "none" + resource = None + if self.try_wayback: + via = "wayback" + resource = self.wayback_client.lookup_resource(url, best_mimetype) + if self.try_spn2 and (not resource or not resource.hit): + via = "spn2" resource = self.spn_client.crawl_resource(url, self.wayback_client) - print("[FETCH] {} url:{}".format( + print("[FETCH {}\t] {}\turl:{}".format( + via, resource.status, url), file=sys.stderr) @@ -146,11 +154,6 @@ class IngestFileWorker(SandcrawlerWorker): try: # first hop resource = self.find_resource(base_url, best_mimetype) - print("[{}] fetch status: {} url: {}".format( - resource.status, - ingest_type, - base_url), - file=sys.stderr) if not resource.hit: result['status'] = resource.status return result @@ -218,6 +221,7 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "success" result['hit'] = True + result['cdx'] = cdx_to_dict(resource.cdx) return result diff --git a/python/tests/test_savepagenow.py b/python/tests/test_savepagenow.py index 8681575..63dd887 100644 --- a/python/tests/test_savepagenow.py +++ b/python/tests/test_savepagenow.py @@ -182,10 +182,16 @@ def test_crawl_resource(spn_client, wayback_client): 'http://dummy-cdx/cdx', status=200, body=json.dumps(CDX_SPN_HIT)) + responses.add(responses.GET, + 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"), + status=200, + headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, + body=WARC_BODY) + print('https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect")) resp = spn_client.crawl_resource(TARGET, wayback_client) - assert len(responses.calls) == 4 + assert len(responses.calls) == 5 assert resp.hit == True assert resp.status == "success" -- cgit v1.2.3