aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/__init__.py2
-rw-r--r--python/sandcrawler/ia.py190
-rw-r--r--python/tests/test_savepagenow.py40
-rw-r--r--python/tests/test_wayback.py40
4 files changed, 220 insertions, 52 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 236570e..699126f 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -2,7 +2,7 @@
from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker
from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
-from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, SavePageNowRemoteError
+from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index 1522708..ba9b5b9 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -27,6 +27,12 @@ ResourceResult = namedtuple("ResourceResult", [
"cdx",
])
+WarcResource = namedtuple("WarcResource", [
+ "status_code",
+ "location",
+ "body",
+])
+
CdxRow = namedtuple('CdxRow', [
'surt',
'datetime',
@@ -50,6 +56,18 @@ CdxPartial = namedtuple('CdxPartial', [
'sha1hex',
])
+def cdx_partial_from_row(full):
+ return CdxPartial(
+ surt=full.surt,
+ datetime=full.datetime,
+ url=full.url,
+ mimetype=full.mimetype,
+ status_code=full.status_code,
+ sha1b32=full.sha1b32,
+ sha1hex=full.sha1hex,
+ )
+
+
class CdxApiError(Exception):
pass
@@ -58,16 +76,14 @@ class CdxApiClient:
def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
+ cdx_auth_token = kwargs.get('cdx_auth_token',
+ os.environ.get('CDX_AUTH_TOKEN'))
+ if not cdx_auth_token:
+ raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
self.http_session.headers.update({
'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient',
+ 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
})
- self.cdx_auth_token = kwargs.get('cdx_auth_token',
- os.environ.get('CDX_AUTH_TOKEN'))
- if self.cdx_auth_token:
- self.http_session.headers.update({
- 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
- })
- self.wayback_endpoint = "https://web.archive.org/web/"
def _query_api(self, params):
"""
@@ -111,6 +127,7 @@ class CdxApiClient:
'to': datetime,
'matchType': 'exact',
'limit': -1,
+ 'fastLatest': True,
'output': 'json',
}
resp = self._query_api(params)
@@ -131,6 +148,7 @@ class CdxApiClient:
'url': url,
'matchType': 'exact',
'limit': -25,
+ 'fastLatest': True,
'output': 'json',
'collapse': 'timestamp:6',
}
@@ -173,6 +191,9 @@ class CdxApiClient:
class WaybackError(Exception):
pass
+class PetaboxError(Exception):
+ pass
+
class WaybackClient:
def __init__(self, cdx_client=None, **kwargs):
@@ -181,13 +202,35 @@ class WaybackClient:
else:
self.cdx_client = CdxApiClient()
# /serve/ instead of /download/ doesn't record view count
- self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
+ self.petabox_base_url = kwargs.get('petabox_base_url', 'https://archive.org/serve/')
# gwb library will fall back to reading from /opt/.petabox/webdata.secret
self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/')
self.rstore = None
+ self.max_redirects = 25
+
+ def fetch_petabox(self, c_size, offset, warc_path):
+ """
+ Fetches wayback resource directly from petabox using WARC path/offset/csize.
+
+ If there is a problem with petabox, raises a PetaboxError.
+ If resource doesn't exist, would raise a KeyError (TODO).
- def fetch_warc_content(self, warc_path, offset, c_size):
+ The full record is returned as crawled; it may be a redirect, 404
+ response, etc.
+
+ WarcResource object (namedtuple) contains fields:
+ - status_code: int
+ - location: eg, for redirects
+ - body: raw bytes
+
+ Requires (and uses) a secret token.
+ """
+ if not self.petabox_webdata_secret:
+ raise Exception("WaybackClient needs petabox secret to do direct WARC fetches")
+ # TODO:
+ #if not "/" in warc_path:
+ # raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
@@ -196,38 +239,60 @@ class WaybackClient:
try:
gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable:
- raise WaybackError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
except ValueError as ve:
- raise WaybackError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
except EOFError as eofe:
- raise WaybackError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
except TypeError as te:
- raise WaybackError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
# Note: could consider a generic "except Exception" here, as we get so
# many petabox errors. Do want jobs to fail loud and clear when the
# whole cluster is down though.
- if gwb_record.get_status()[0] != 200:
- raise WaybackError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0]))
+ status_code = gwb_record.get_status()[0]
+ location = gwb_record.get_location()[0]
- try:
- raw_content = gwb_record.open_raw_content().read()
- except IncompleteRead as ire:
- raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
- return raw_content
+ body = None
+ if status_code == 200:
+ try:
+ body = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return WarcResource(
+ status_code=status_code,
+ location=location,
+ body=body,
+ )
- def fetch_warc_by_url_dt(self, url, datetime):
+ def fetch_petabox_body(self, c_size, offset, warc_path):
"""
- Helper wrapper that first hits CDX API to get a full CDX row, then
- fetches content from wayback
+ Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
+
+ Returns bytes. Raises KeyError if resource wasn't an HTTP 200.
+
+ Thin helper around fetch_petabox()
"""
- cdx_row = self.cdx_client.lookup(url, datetime)
- return self.fetch_warc_content(
- cdx_row['warc_path'],
- cdx_row['warc_offset'],
- cdx_row['warc_csize'])
+ resource = self.fetch_petabox(c_size, offset, warc_path)
+
+ if resource.status_code != 200:
+ raise KeyError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0]))
- def fetch_resource(self, start_url, mimetype=None):
+ return resource.body
+
+ def fetch_replay(self, url, datetime):
+ """
+ Fetches an HTTP 200 record from wayback via the replay interface
+ (web.archive.org) instead of petabox.
+
+ Intended for use with SPN2 requests, where request body has not ended
+ up in petabox yet.
+
+ TODO: is this actually necessary?
+ """
+ raise NotImplementedError
+
+ def lookup_resource(self, start_url, best_mimetype=None):
"""
Looks in wayback for a resource starting at the URL, following any
redirects. Returns a ResourceResult object.
@@ -249,12 +314,21 @@ class WaybackClient:
"""
next_url = start_url
urls_seen = [start_url]
- for i in range(25):
- cdx_row = self.cdx_client.lookup_best(next_url, mimetype=mimetype)
+ for i in range(self.max_redirects):
+ cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype)
if not cdx_row:
- return None
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="no-capture",
+ terminal_url=None,
+ terminal_dt=None,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ )
if cdx.status_code == 200:
- body = self.fetch_warc_content(cdx.warc_path, cdx.warc_offset, cdx_row.warc_csize)
+ body = self.fetch_petabox_body(cdx.warc_csize, cdx.warc_offset, cdx_row.warc_path)
return ResourceResult(
start_url=start_url,
hit=True,
@@ -265,9 +339,10 @@ class WaybackClient:
body=body,
cdx=cdx_row,
)
- elif cdx_row.status_code >= 300 and cdx_row.status_code < 400:
- body = self.fetch_warc_content(cdx_row.warc_path, cdx_row.warc_offset, cdx_row.warc_csize)
- next_url = body.get_redirect_url()
+ elif 300 <= cdx_row.status_code < 400:
+ resource = self.fetch_petabox(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path)
+ assert 300 <= resource.status_code < 400
+ next_url = resource.location
if next_url in urls_seen:
return ResourceResult(
start_url=start_url,
@@ -285,7 +360,7 @@ class WaybackClient:
return ResourceResult(
start_url=start_url,
hit=False,
- status="remote-status",
+ status="terminal-not-success",
terminal_url=cdx_row.url,
terminal_dt=cdx_row.datetime,
terminal_status_code=cdx_row.status_code,
@@ -348,7 +423,7 @@ class SavePageNowClient:
- job_id: returned by API
- request_url: url we asked to fetch
- terminal_url: final primary resource (after any redirects)
- - terminal_timestamp: wayback timestamp of final capture
+ - terminal_dt: wayback timestamp of final capture
- resources: list of all URLs captured
TODO: parse SPN error codes and handle better. Eg, non-200 remote
@@ -415,3 +490,42 @@ class SavePageNowClient:
None,
)
+ def crawl_resource(self, start_url, wayback_client):
+ """
+ Runs a SPN2 crawl, then fetches body from wayback.
+
+ TODO: possible to fetch from petabox?
+ """
+
+ spn_result = self.save_url_now_v2(start_url)
+
+ if not spn_result.success:
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status=spn_result.status,
+ terminal_url=spn_result.terminal_url,
+ terminal_dt=spn_result.terminal_dt,
+ terminal_status_code=spn_result.terminal_status_code,
+ body=None,
+ cdx=None,
+ )
+
+ # fetch CDX and body
+ cdx_row = wayback_client.cdx_client.fetch(spn_result.terminal_url, spn_result.terminal_dt)
+ assert cdx_row.status_code == 200
+ body = wayback_client.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path)
+
+ # not a full CDX yet
+ cdx_partial = cdx_partial_from_row(cdx_row)
+ return ResourceResult(
+ start_url=start_url,
+ hit=True,
+ status="success",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.status_code,
+ terminal_status_code=cdx_row.status_code,
+ body=body,
+ cdx=cdx_partial,
+ )
+
diff --git a/python/tests/test_savepagenow.py b/python/tests/test_savepagenow.py
index cbc6aef..8681575 100644
--- a/python/tests/test_savepagenow.py
+++ b/python/tests/test_savepagenow.py
@@ -3,7 +3,8 @@ import json
import pytest
import responses
-from sandcrawler import SavePageNowClient, SavePageNowError
+from sandcrawler import SavePageNowClient, SavePageNowError, CdxPartial
+from test_wayback import *
TARGET = "http://dummy-target.dummy"
@@ -72,6 +73,10 @@ ERROR_BODY = {
"message": "Couldn't resolve host for http://example5123.com.",
"resources": []
}
+CDX_SPN_HIT = [
+ ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"],
+ ["wiki,fatcat)/", "20180326070330", TARGET + "/redirect", "application/pdf", "200", CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", "liveweb-20200108215212-wwwb-spn04.us.archive.org-kols1pud.warc.gz"],
+]
@pytest.fixture
def spn_client():
@@ -158,3 +163,36 @@ def test_savepagenow_500(spn_client):
assert len(responses.calls) == 2
+@responses.activate
+def test_crawl_resource(spn_client, wayback_client):
+
+ responses.add(responses.POST,
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ responses.add(responses.GET,
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
+ responses.add(responses.GET,
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(SUCCESS_BODY))
+ responses.add(responses.GET,
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_SPN_HIT))
+
+ resp = spn_client.crawl_resource(TARGET, wayback_client)
+
+ assert len(responses.calls) == 4
+
+ assert resp.hit == True
+ assert resp.status == "success"
+ assert resp.body == WARC_BODY
+ assert resp.cdx.sha1b32 == CDX_BEST_SHA1B32
+
+ assert type(resp.cdx) == CdxPartial
+ with pytest.raises(AttributeError):
+ print(resp.cdx.warc_path)
+
diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py
index 7e63ec7..eeb4b37 100644
--- a/python/tests/test_wayback.py
+++ b/python/tests/test_wayback.py
@@ -35,14 +35,7 @@ CDX_MULTI_HIT = [
def cdx_client():
client = CdxApiClient(
host_url="http://dummy-cdx/cdx",
- )
- return client
-
-@pytest.fixture
-def wayback_client(cdx_client):
- client = WaybackClient(
- cdx_client=cdx_client,
- petabox_webdata_secret="dummy-petabox-secret",
+ cdx_auth_token="dummy-token",
)
return client
@@ -102,9 +95,32 @@ def test_cdx_lookup_best(cdx_client):
assert resp.sha1b32 == CDX_BEST_SHA1B32
assert resp.warc_path == CDX_SINGLE_HIT[1][-1]
+WARC_TARGET = "http://fatcat.wiki/"
+WARC_BODY = "<html>some stuff</html>"
+
+@pytest.fixture
+def wayback_client(cdx_client, mocker):
+ client = WaybackClient(
+ cdx_client=cdx_client,
+ petabox_webdata_secret="dummy-petabox-secret",
+ )
+ # mock out the wayback store with mock stuff
+ client.rstore = mocker.Mock()
+ resource = mocker.Mock()
+ client.rstore.load_resource = mocker.MagicMock(return_value=resource)
+ resource.get_status = mocker.MagicMock(return_value=[200])
+ resource.get_location = mocker.MagicMock(return_value=[WARC_TARGET])
+ body = mocker.Mock()
+ resource.open_raw_content = mocker.MagicMock(return_value=body)
+ body.read = mocker.MagicMock(return_value=WARC_BODY)
+
+ return client
+
def test_wayback_fetch(wayback_client, mocker):
- # mock something
- #mocker.patch('fatcat_tools.harvest.harvest_common.HarvestState.initialize_from_kafka')
- #blah = mocker.Mock()
- return
+ resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz")
+ assert resp.body == WARC_BODY
+ assert resp.location == WARC_TARGET
+
+ resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz")
+ assert resp == WARC_BODY