diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-01-08 17:03:59 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-09 16:29:37 -0800 |
commit | 51e2b302d223dc79c38dc0339e66719fd38f067c (patch) | |
tree | fcbd99c93362a183d744e2967b3954d3634481b6 | |
parent | 96e5ed0f0a8b6fc041fdc8076940cd82be891563 (diff) | |
download | sandcrawler-51e2b302d223dc79c38dc0339e66719fd38f067c.tar.gz sandcrawler-51e2b302d223dc79c38dc0339e66719fd38f067c.zip |
more wayback and SPN tests and fixes
-rw-r--r-- | python/sandcrawler/__init__.py | 2 | ||||
-rw-r--r-- | python/sandcrawler/ia.py | 190 | ||||
-rw-r--r-- | python/tests/test_savepagenow.py | 40 | ||||
-rw-r--r-- | python/tests/test_wayback.py | 40 |
4 files changed, 220 insertions, 52 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index 236570e..699126f 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -2,7 +2,7 @@ from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper -from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, SavePageNowRemoteError +from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow from .ingest import IngestFileWorker from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 1522708..ba9b5b9 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -27,6 +27,12 @@ ResourceResult = namedtuple("ResourceResult", [ "cdx", ]) +WarcResource = namedtuple("WarcResource", [ + "status_code", + "location", + "body", +]) + CdxRow = namedtuple('CdxRow', [ 'surt', 'datetime', @@ -50,6 +56,18 @@ CdxPartial = namedtuple('CdxPartial', [ 'sha1hex', ]) +def cdx_partial_from_row(full): + return CdxPartial( + surt=full.surt, + datetime=full.datetime, + url=full.url, + mimetype=full.mimetype, + status_code=full.status_code, + sha1b32=full.sha1b32, + sha1hex=full.sha1hex, + ) + + class CdxApiError(Exception): pass @@ -58,16 +76,14 @@ class CdxApiClient: def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) + cdx_auth_token = kwargs.get('cdx_auth_token', + os.environ.get('CDX_AUTH_TOKEN')) + if not cdx_auth_token: + raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") self.http_session.headers.update({ 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient', + 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), }) - self.cdx_auth_token = kwargs.get('cdx_auth_token', - os.environ.get('CDX_AUTH_TOKEN')) - if self.cdx_auth_token: - self.http_session.headers.update({ - 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), - }) - self.wayback_endpoint = "https://web.archive.org/web/" def _query_api(self, params): """ @@ -111,6 +127,7 @@ class CdxApiClient: 'to': datetime, 'matchType': 'exact', 'limit': -1, + 'fastLatest': True, 'output': 'json', } resp = self._query_api(params) @@ -131,6 +148,7 @@ class CdxApiClient: 'url': url, 'matchType': 'exact', 'limit': -25, + 'fastLatest': True, 'output': 'json', 'collapse': 'timestamp:6', } @@ -173,6 +191,9 @@ class CdxApiClient: class WaybackError(Exception): pass +class PetaboxError(Exception): + pass + class WaybackClient: def __init__(self, cdx_client=None, **kwargs): @@ -181,13 +202,35 @@ class WaybackClient: else: self.cdx_client = CdxApiClient() # /serve/ instead of /download/ doesn't record view count - self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + self.petabox_base_url = kwargs.get('petabox_base_url', 'https://archive.org/serve/') # gwb library will fall back to reading from /opt/.petabox/webdata.secret self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/') self.rstore = None + self.max_redirects = 25 + + def fetch_petabox(self, c_size, offset, warc_path): + """ + Fetches wayback resource directly from petabox using WARC path/offset/csize. + + If there is a problem with petabox, raises a PetaboxError. + If resource doesn't exist, would raise a KeyError (TODO). - def fetch_warc_content(self, warc_path, offset, c_size): + The full record is returned as crawled; it may be a redirect, 404 + response, etc. + + WarcResource object (namedtuple) contains fields: + - status_code: int + - location: eg, for redirects + - body: raw bytes + + Requires (and uses) a secret token. + """ + if not self.petabox_webdata_secret: + raise Exception("WaybackClient needs petabox secret to do direct WARC fetches") + # TODO: + #if not "/" in warc_path: + # raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( @@ -196,38 +239,60 @@ class WaybackClient: try: gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: - raise WaybackError("failed to load file contents from wayback/petabox (ResourceUnavailable)") + raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") except ValueError as ve: - raise WaybackError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) except EOFError as eofe: - raise WaybackError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) except TypeError as te: - raise WaybackError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. - if gwb_record.get_status()[0] != 200: - raise WaybackError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0])) + status_code = gwb_record.get_status()[0] + location = gwb_record.get_location()[0] - try: - raw_content = gwb_record.open_raw_content().read() - except IncompleteRead as ire: - raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) - return raw_content + body = None + if status_code == 200: + try: + body = gwb_record.open_raw_content().read() + except IncompleteRead as ire: + raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + return WarcResource( + status_code=status_code, + location=location, + body=body, + ) - def fetch_warc_by_url_dt(self, url, datetime): + def fetch_petabox_body(self, c_size, offset, warc_path): """ - Helper wrapper that first hits CDX API to get a full CDX row, then - fetches content from wayback + Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. + + Returns bytes. Raises KeyError if resource wasn't an HTTP 200. + + Thin helper around fetch_petabox() """ - cdx_row = self.cdx_client.lookup(url, datetime) - return self.fetch_warc_content( - cdx_row['warc_path'], - cdx_row['warc_offset'], - cdx_row['warc_csize']) + resource = self.fetch_petabox(c_size, offset, warc_path) + + if resource.status_code != 200: + raise KeyError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0])) - def fetch_resource(self, start_url, mimetype=None): + return resource.body + + def fetch_replay(self, url, datetime): + """ + Fetches an HTTP 200 record from wayback via the replay interface + (web.archive.org) instead of petabox. + + Intended for use with SPN2 requests, where request body has not ended + up in petabox yet. + + TODO: is this actually necessary? + """ + raise NotImplementedError + + def lookup_resource(self, start_url, best_mimetype=None): """ Looks in wayback for a resource starting at the URL, following any redirects. Returns a ResourceResult object. @@ -249,12 +314,21 @@ class WaybackClient: """ next_url = start_url urls_seen = [start_url] - for i in range(25): - cdx_row = self.cdx_client.lookup_best(next_url, mimetype=mimetype) + for i in range(self.max_redirects): + cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype) if not cdx_row: - return None + return ResourceResult( + start_url=start_url, + hit=False, + status="no-capture", + terminal_url=None, + terminal_dt=None, + terminal_status_code=None, + body=None, + cdx=None, + ) if cdx.status_code == 200: - body = self.fetch_warc_content(cdx.warc_path, cdx.warc_offset, cdx_row.warc_csize) + body = self.fetch_petabox_body(cdx.warc_csize, cdx.warc_offset, cdx_row.warc_path) return ResourceResult( start_url=start_url, hit=True, @@ -265,9 +339,10 @@ class WaybackClient: body=body, cdx=cdx_row, ) - elif cdx_row.status_code >= 300 and cdx_row.status_code < 400: - body = self.fetch_warc_content(cdx_row.warc_path, cdx_row.warc_offset, cdx_row.warc_csize) - next_url = body.get_redirect_url() + elif 300 <= cdx_row.status_code < 400: + resource = self.fetch_petabox(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path) + assert 300 <= resource.status_code < 400 + next_url = resource.location if next_url in urls_seen: return ResourceResult( start_url=start_url, @@ -285,7 +360,7 @@ class WaybackClient: return ResourceResult( start_url=start_url, hit=False, - status="remote-status", + status="terminal-not-success", terminal_url=cdx_row.url, terminal_dt=cdx_row.datetime, terminal_status_code=cdx_row.status_code, @@ -348,7 +423,7 @@ class SavePageNowClient: - job_id: returned by API - request_url: url we asked to fetch - terminal_url: final primary resource (after any redirects) - - terminal_timestamp: wayback timestamp of final capture + - terminal_dt: wayback timestamp of final capture - resources: list of all URLs captured TODO: parse SPN error codes and handle better. Eg, non-200 remote @@ -415,3 +490,42 @@ class SavePageNowClient: None, ) + def crawl_resource(self, start_url, wayback_client): + """ + Runs a SPN2 crawl, then fetches body from wayback. + + TODO: possible to fetch from petabox? + """ + + spn_result = self.save_url_now_v2(start_url) + + if not spn_result.success: + return ResourceResult( + start_url=start_url, + hit=False, + status=spn_result.status, + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=spn_result.terminal_status_code, + body=None, + cdx=None, + ) + + # fetch CDX and body + cdx_row = wayback_client.cdx_client.fetch(spn_result.terminal_url, spn_result.terminal_dt) + assert cdx_row.status_code == 200 + body = wayback_client.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path) + + # not a full CDX yet + cdx_partial = cdx_partial_from_row(cdx_row) + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.status_code, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_partial, + ) + diff --git a/python/tests/test_savepagenow.py b/python/tests/test_savepagenow.py index cbc6aef..8681575 100644 --- a/python/tests/test_savepagenow.py +++ b/python/tests/test_savepagenow.py @@ -3,7 +3,8 @@ import json import pytest import responses -from sandcrawler import SavePageNowClient, SavePageNowError +from sandcrawler import SavePageNowClient, SavePageNowError, CdxPartial +from test_wayback import * TARGET = "http://dummy-target.dummy" @@ -72,6 +73,10 @@ ERROR_BODY = { "message": "Couldn't resolve host for http://example5123.com.", "resources": [] } +CDX_SPN_HIT = [ + ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"], + ["wiki,fatcat)/", "20180326070330", TARGET + "/redirect", "application/pdf", "200", CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", "liveweb-20200108215212-wwwb-spn04.us.archive.org-kols1pud.warc.gz"], +] @pytest.fixture def spn_client(): @@ -158,3 +163,36 @@ def test_savepagenow_500(spn_client): assert len(responses.calls) == 2 +@responses.activate +def test_crawl_resource(spn_client, wayback_client): + + responses.add(responses.POST, + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + responses.add(responses.GET, + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) + responses.add(responses.GET, + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(SUCCESS_BODY)) + responses.add(responses.GET, + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_SPN_HIT)) + + resp = spn_client.crawl_resource(TARGET, wayback_client) + + assert len(responses.calls) == 4 + + assert resp.hit == True + assert resp.status == "success" + assert resp.body == WARC_BODY + assert resp.cdx.sha1b32 == CDX_BEST_SHA1B32 + + assert type(resp.cdx) == CdxPartial + with pytest.raises(AttributeError): + print(resp.cdx.warc_path) + diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py index 7e63ec7..eeb4b37 100644 --- a/python/tests/test_wayback.py +++ b/python/tests/test_wayback.py @@ -35,14 +35,7 @@ CDX_MULTI_HIT = [ def cdx_client(): client = CdxApiClient( host_url="http://dummy-cdx/cdx", - ) - return client - -@pytest.fixture -def wayback_client(cdx_client): - client = WaybackClient( - cdx_client=cdx_client, - petabox_webdata_secret="dummy-petabox-secret", + cdx_auth_token="dummy-token", ) return client @@ -102,9 +95,32 @@ def test_cdx_lookup_best(cdx_client): assert resp.sha1b32 == CDX_BEST_SHA1B32 assert resp.warc_path == CDX_SINGLE_HIT[1][-1] +WARC_TARGET = "http://fatcat.wiki/" +WARC_BODY = "<html>some stuff</html>" + +@pytest.fixture +def wayback_client(cdx_client, mocker): + client = WaybackClient( + cdx_client=cdx_client, + petabox_webdata_secret="dummy-petabox-secret", + ) + # mock out the wayback store with mock stuff + client.rstore = mocker.Mock() + resource = mocker.Mock() + client.rstore.load_resource = mocker.MagicMock(return_value=resource) + resource.get_status = mocker.MagicMock(return_value=[200]) + resource.get_location = mocker.MagicMock(return_value=[WARC_TARGET]) + body = mocker.Mock() + resource.open_raw_content = mocker.MagicMock(return_value=body) + body.read = mocker.MagicMock(return_value=WARC_BODY) + + return client + def test_wayback_fetch(wayback_client, mocker): - # mock something - #mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka') - #blah = mocker.Mock() - return + resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz") + assert resp.body == WARC_BODY + assert resp.location == WARC_TARGET + + resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz") + assert resp == WARC_BODY |