aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-09 15:37:42 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-09 16:33:21 -0800
commit38e635105a658850399847aa23a5bd5325b0d616 (patch)
tree3a887091df76b6128bb120e24942b27cf14e837d
parent101e3c818fa5852c11003e616526726e189ab2c3 (diff)
downloadsandcrawler-38e635105a658850399847aa23a5bd5325b0d616.tar.gz
sandcrawler-38e635105a658850399847aa23a5bd5325b0d616.zip
lots of progress on wayback refactoring
- too much to list - canonical flags to control crawling - cdx_to_dict helper
-rw-r--r--python/sandcrawler/ia.py162
-rw-r--r--python/sandcrawler/ingest.py26
-rw-r--r--python/tests/test_savepagenow.py8
3 files changed, 145 insertions, 51 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index 6544e58..6468743 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -67,6 +67,21 @@ def cdx_partial_from_row(full):
sha1hex=full.sha1hex,
)
+def cdx_to_dict(cdx):
+ d = {
+ "surt": cdx.surt,
+ "datetime": cdx.datetime,
+ "url": cdx.url,
+ "mimetype": cdx.mimetype,
+ "status_code": cdx.status_code,
+ "sha1b32": cdx.sha1b32,
+ "sha1hex": cdx.sha1hex,
+ }
+ if type(cdx) == CdxRow and '/' in cdx.warc_path:
+ d['warc_csize'] = cdx.warc_csize
+ d['warc_offset'] = cdx.warc_offset
+ d['warc_path'] = cdx.warc_path
+ return d
class CdxApiError(Exception):
pass
@@ -98,6 +113,7 @@ class CdxApiClient:
rows = []
for raw in rj[1:]:
assert len(raw) == 11 # JSON is short
+ #print(raw, file=sys.stderr)
row = CdxRow(
surt=raw[0],
datetime=raw[1],
@@ -136,7 +152,9 @@ class CdxApiClient:
raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
row = resp[0]
if not (row.url == url and row.datetime == datetime):
- raise KeyError("CDX url/datetime not found: {} {} (closest: {})".format(url, datetime, row))
+ raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row))
+ if filter_status_code:
+ assert row.status_code == filter_status_code
return row
def lookup_best(self, url, max_age_days=None, best_mimetype=None):
@@ -209,13 +227,16 @@ class WaybackClient:
# this *does* want to be http://, not https://
self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
# gwb library will fall back to reading from /opt/.petabox/webdata.secret
- self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.petabox_webdata_secret = kwargs.get(
+ 'petabox_webdata_secret',
+ os.environ.get('PETABOX_WEBDATA_SECRET'),
+ )
self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/')
self.rstore = None
self.max_redirects = 25
self.wayback_endpoint = "https://web.archive.org/web/"
- def fetch_petabox(self, c_size, offset, warc_path):
+ def fetch_petabox(self, csize, offset, warc_path):
"""
Fetches wayback resource directly from petabox using WARC path/offset/csize.
@@ -243,7 +264,8 @@ class WaybackClient:
webdata_secret=self.petabox_webdata_secret,
download_base_url=self.petabox_base_url))
try:
- gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
+ #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
+ gwb_record = self.rstore.load_resource(warc_uri, offset, csize)
except wayback.exception.ResourceUnavailable:
raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
except ValueError as ve:
@@ -264,14 +286,15 @@ class WaybackClient:
try:
body = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
- raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ raise WaybackError(
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
return WarcResource(
status_code=status_code,
location=location,
body=body,
)
- def fetch_petabox_body(self, c_size, offset, warc_path):
+ def fetch_petabox_body(self, csize, offset, warc_path):
"""
Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
@@ -279,14 +302,20 @@ class WaybackClient:
Thin helper around fetch_petabox()
"""
- resource = self.fetch_petabox(c_size, offset, warc_path)
+ resource = self.fetch_petabox(
+ csize=csize,
+ offset=offset,
+ warc_path=warc_path,
+ )
if resource.status_code != 200:
- raise KeyError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0]))
+ raise KeyError("archived HTTP response (WARC) was not 200: {}".format(
+ gwb_record.get_status()[0]),
+ )
return resource.body
- def fetch_replay_body(self, url, datetime):
+ def fetch_replay_body(self, url, datetime, cdx_sha1hex=None):
"""
Fetches an HTTP 200 record from wayback via the replay interface
(web.archive.org) instead of petabox.
@@ -294,9 +323,20 @@ class WaybackClient:
Intended for use with SPN2 requests, where request body has not ended
up in petabox yet.
- TODO: is this really necessary?
+ If cdx_sha1hex is passed, will try to verify fetched body. Note that
+ this check *won't work* in many cases, due to CDX hash being of
+ compressed transfer data, not the uncompressed final content bytes.
+
+ TODO: could instead try to verify that we got the expected replay body
+ using... new X-Archive headers?
"""
+
+ # defensively check datetime format
+ assert len(datetime) == 14
+ assert datetime.isdigit()
+
try:
+ # TODO: don't follow redirects?
resp = requests.get(self.wayback_endpoint + datetime + "id_/" + url)
except requests.exceptions.TooManyRedirects:
raise WaybackError("redirect loop (wayback replay fetch)")
@@ -304,26 +344,46 @@ class WaybackClient:
resp.raise_for_status()
except Exception as e:
raise WaybackError(str(e))
- # TODO: some verification here?
#print(resp.url, file=sys.stderr)
- #print(resp.content)
+
+ # defensively check that this is actually correct replay based on headers
+ assert "X-Archive-Src" in resp.headers
+ assert datetime in resp.url
+
+ if cdx_sha1hex:
+ # verify that body matches CDX hash
+ # TODO: don't need *all* these hashes, just sha1
+ file_meta = gen_file_metadata(resp.content)
+ if cdx_sha1hex != file_meta['sha1hex']:
+ print("REPLAY MISMATCH: cdx:{} replay:{}".format(
+ cdx_sha1hex,
+ file_meta['sha1hex']),
+ file=sys.stderr)
+ raise WaybackError("replay fetch body didn't match CDX hash cdx:{} body:{}".format(
+ cdx_sha1hex,
+ file_meta['sha1hex']),
+ )
return resp.content
def lookup_resource(self, start_url, best_mimetype=None):
"""
Looks in wayback for a resource starting at the URL, following any
- redirects. Returns a ResourceResult object.
+ redirects. Returns a ResourceResult object, which may indicate a
+ failure to fetch the resource.
+
+ Only raises exceptions on remote service failure or unexpected
+ problems.
In a for loop:
- lookup best CDX
- redirect?
+ lookup "best" CDX
+ redirect status code?
fetch wayback
continue
- good?
+ success (200)?
fetch wayback
return success
- bad?
+ bad (other status)?
return failure
got to end?
@@ -333,6 +393,7 @@ class WaybackClient:
urls_seen = [start_url]
for i in range(self.max_redirects):
cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype)
+ #print(cdx_row, file=sys.stderr)
if not cdx_row:
return ResourceResult(
start_url=start_url,
@@ -345,7 +406,18 @@ class WaybackClient:
cdx=None,
)
if cdx_row.status_code == 200:
- body = self.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path)
+ if '/' in cdx_row.warc_path:
+ body = self.fetch_petabox_body(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ )
+ else:
+ body = self.fetch_replay_body(
+ url=cdx_row.url,
+ datetime=cdx_row.datetime,
+ )
+ cdx_row = cdx_partial_from_row(cdx_row)
return ResourceResult(
start_url=start_url,
hit=True,
@@ -357,8 +429,14 @@ class WaybackClient:
cdx=cdx_row,
)
elif 300 <= cdx_row.status_code < 400:
- resource = self.fetch_petabox(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path)
+ resource = self.fetch_petabox(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ )
assert 300 <= resource.status_code < 400
+ assert resource.location
+ #print(resource, file=sys.stderr)
next_url = resource.location
if next_url in urls_seen:
return ResourceResult(
@@ -443,8 +521,8 @@ class SavePageNowClient:
- terminal_dt: wayback timestamp of final capture
- resources: list of all URLs captured
- TODO: parse SPN error codes and handle better. Eg, non-200 remote
- statuses, invalid hosts/URLs, timeouts, backoff, etc.
+ TODO: parse SPN error codes (status string) and handle better. Eg,
+ non-200 remote statuses, invalid hosts/URLs, timeouts, backoff, etc.
"""
if not (self.ia_access_key and self.ia_secret_key):
raise Exception("SPN2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)")
@@ -486,6 +564,8 @@ class SavePageNowClient:
if not final_json:
raise SavePageNowError("SPN2 timed out (polling count exceeded)")
+ # if there was a recent crawl of same URL, fetch the status of that
+ # crawl to get correct datetime
if final_json.get('original_job_id'):
resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id']))
try:
@@ -538,26 +618,30 @@ class SavePageNowClient:
cdx=None,
)
- # sleep one second to let CDX API update (XXX)
- time.sleep(1.0)
-
- # fetch CDX, then body
+ # fetch exact CDX row
cdx_row = wayback_client.cdx_client.fetch(
- spn_result.terminal_url,
- spn_result.terminal_dt,
+ url=spn_result.terminal_url,
+ datetime=spn_result.terminal_dt,
filter_status_code=200,
)
- print(spn_result, file=sys.stderr)
- print(cdx_row, file=sys.stderr)
- assert cdx_row.status_code == 200
- body = wayback_client.fetch_replay_body(cdx_row.url, cdx_row.datetime)
- file_meta = gen_file_metadata(body)
- if cdx_row.sha1hex != file_meta['sha1hex']:
- print(file_meta, file=sys.stderr)
- raise WaybackError("replay fetch resulted in non-matching SHA1 (vs. CDX API)")
-
- # not a full CDX yet
- cdx_partial = cdx_partial_from_row(cdx_row)
+
+ if '/' in cdx_row.warc_path:
+ # Usually can't do this kind of direct fetch because CDX result is recent/live
+ resource = wayback_client.fetch_petabox(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ )
+ body = resource.body
+ else:
+ # note: currently not trying to verify cdx_row.sha1hex
+ body = wayback_client.fetch_replay_body(
+ url=cdx_row.url,
+ datetime=cdx_row.datetime,
+ )
+ # warc_path etc will change, so strip them out
+ cdx_row = cdx_partial_from_row(cdx_row)
+
return ResourceResult(
start_url=start_url,
hit=True,
@@ -566,6 +650,6 @@ class SavePageNowClient:
terminal_dt=cdx_row.datetime,
terminal_status_code=cdx_row.status_code,
body=body,
- cdx=cdx_partial,
+ cdx=cdx_row,
)
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index f085fc5..4b6c587 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -6,7 +6,7 @@ import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import namedtuple
-from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict
from sandcrawler.grobid import GrobidClient
from sandcrawler.misc import gen_file_metadata
from sandcrawler.html import extract_fulltext_url
@@ -54,7 +54,9 @@ class IngestFileWorker(SandcrawlerWorker):
if not self.grobid_client:
self.grobid_client = GrobidClient()
- self.attempt_existing = False
+ self.try_existing_ingest = False
+ self.try_wayback = True
+ self.try_spn2 = True
def check_existing_ingest(self, base_url):
"""
@@ -66,7 +68,7 @@ class IngestFileWorker(SandcrawlerWorker):
Looks at existing ingest results and makes a decision based on, eg,
status and timestamp.
"""
- if not self.attempt_existing:
+ if not self.try_existing_ingest:
return None
raise NotImplementedError
@@ -78,10 +80,16 @@ class IngestFileWorker(SandcrawlerWorker):
Looks in wayback for a resource starting at the URL, following any
redirects. If a hit isn't found, try crawling with SPN.
"""
- resource = self.wayback_client.lookup_resource(url, best_mimetype)
- if not resource or not resource.hit:
+ via = "none"
+ resource = None
+ if self.try_wayback:
+ via = "wayback"
+ resource = self.wayback_client.lookup_resource(url, best_mimetype)
+ if self.try_spn2 and (not resource or not resource.hit):
+ via = "spn2"
resource = self.spn_client.crawl_resource(url, self.wayback_client)
- print("[FETCH] {} url:{}".format(
+ print("[FETCH {}\t] {}\turl:{}".format(
+ via,
resource.status,
url),
file=sys.stderr)
@@ -146,11 +154,6 @@ class IngestFileWorker(SandcrawlerWorker):
try:
# first hop
resource = self.find_resource(base_url, best_mimetype)
- print("[{}] fetch status: {} url: {}".format(
- resource.status,
- ingest_type,
- base_url),
- file=sys.stderr)
if not resource.hit:
result['status'] = resource.status
return result
@@ -218,6 +221,7 @@ class IngestFileWorker(SandcrawlerWorker):
result['status'] = "success"
result['hit'] = True
+ result['cdx'] = cdx_to_dict(resource.cdx)
return result
diff --git a/python/tests/test_savepagenow.py b/python/tests/test_savepagenow.py
index 8681575..63dd887 100644
--- a/python/tests/test_savepagenow.py
+++ b/python/tests/test_savepagenow.py
@@ -182,10 +182,16 @@ def test_crawl_resource(spn_client, wayback_client):
'http://dummy-cdx/cdx',
status=200,
body=json.dumps(CDX_SPN_HIT))
+ responses.add(responses.GET,
+ 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"),
+ status=200,
+ headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
+ body=WARC_BODY)
+ print('https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"))
resp = spn_client.crawl_resource(TARGET, wayback_client)
- assert len(responses.calls) == 4
+ assert len(responses.calls) == 5
assert resp.hit == True
assert resp.status == "success"