aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-08 19:30:38 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-09 16:31:40 -0800
commit6454cdc93424b23f484fc56e1f9f986490d05c2b (patch)
tree14e76469fdf9dfead292db1b26bd90b475feaf4e
parent318bcf9dbc244a1130b74252b7842cc4eb954bfd (diff)
downloadsandcrawler-6454cdc93424b23f484fc56e1f9f986490d05c2b.tar.gz
sandcrawler-6454cdc93424b23f484fc56e1f9f986490d05c2b.zip
wrap up basic (locally testable) ingest refactor
-rw-r--r--python/sandcrawler/ia.py42
-rw-r--r--python/sandcrawler/ingest.py355
-rw-r--r--python/tests/test_wayback.py52
3 files changed, 267 insertions, 182 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index ba9b5b9..ac0fef8 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -13,7 +13,7 @@ from http.client import IncompleteRead
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
-from .misc import b32_hex, requests_retry_session
+from .misc import b32_hex, requests_retry_session, gen_file_metadata
ResourceResult = namedtuple("ResourceResult", [
@@ -106,15 +106,15 @@ class CdxApiClient:
status_code=int(raw[4]),
sha1b32=raw[5],
sha1hex=b32_hex(raw[5]),
- warc_csize=raw[8],
- warc_offset=raw[9],
+ warc_csize=int(raw[8]),
+ warc_offset=int(raw[9]),
warc_path=raw[10],
)
assert (row.mimetype == "-") or ("-" not in row)
rows.append(row)
return rows
- def fetch(self, url, datetime):
+ def fetch(self, url, datetime, filter_status_code=None):
"""
Fetches a single CDX row by url/datetime. Raises a KeyError if not
found, because we expect to be looking up a specific full record.
@@ -127,9 +127,10 @@ class CdxApiClient:
'to': datetime,
'matchType': 'exact',
'limit': -1,
- 'fastLatest': True,
'output': 'json',
}
+ if filter_status_code:
+ params['filter'] = "statuscode:{}".format(filter_status_code)
resp = self._query_api(params)
if not resp:
raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
@@ -148,9 +149,9 @@ class CdxApiClient:
'url': url,
'matchType': 'exact',
'limit': -25,
- 'fastLatest': True,
'output': 'json',
'collapse': 'timestamp:6',
+ 'filter': '!mimetype:warc/revisit',
}
if max_age_days:
since = datetime.date.today() - datetime.timedelta(days=max_age_days)
@@ -165,9 +166,11 @@ class CdxApiClient:
200
mimetype match
- most-recent
+ not-liveweb
+ most-recent
no match
- most-recent
+ not-liveweb
+ most-recent
3xx
most-recent
4xx
@@ -178,10 +181,11 @@ class CdxApiClient:
This function will create a tuple that can be used to sort in *reverse* order.
"""
return (
- r.status_code == 200,
- 0 - r.status_code,
- r.mimetype == best_mimetype,
- r.datetime,
+ int(r.status_code == 200),
+ int(0 - r.status_code),
+ int(r.mimetype == best_mimetype),
+ int('/' in r.warc_path),
+ int(r.datetime),
)
rows = sorted(rows, key=cdx_sort_key)
@@ -251,7 +255,7 @@ class WaybackClient:
# whole cluster is down though.
status_code = gwb_record.get_status()[0]
- location = gwb_record.get_location()[0]
+ location = (gwb_record.get_location() or [None])[0]
body = None
if status_code == 200:
@@ -280,7 +284,7 @@ class WaybackClient:
return resource.body
- def fetch_replay(self, url, datetime):
+ def fetch_replay_body(self, url, datetime):
"""
Fetches an HTTP 200 record from wayback via the replay interface
(web.archive.org) instead of petabox.
@@ -327,8 +331,8 @@ class WaybackClient:
body=None,
cdx=None,
)
- if cdx.status_code == 200:
- body = self.fetch_petabox_body(cdx.warc_csize, cdx.warc_offset, cdx_row.warc_path)
+ if cdx_row.status_code == 200:
+ body = self.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path)
return ResourceResult(
start_url=start_url,
hit=True,
@@ -360,7 +364,7 @@ class WaybackClient:
return ResourceResult(
start_url=start_url,
hit=False,
- status="terminal-not-success",
+ status="terminal-bad-status",
terminal_url=cdx_row.url,
terminal_dt=cdx_row.datetime,
terminal_status_code=cdx_row.status_code,
@@ -506,7 +510,7 @@ class SavePageNowClient:
status=spn_result.status,
terminal_url=spn_result.terminal_url,
terminal_dt=spn_result.terminal_dt,
- terminal_status_code=spn_result.terminal_status_code,
+ terminal_status_code=None,
body=None,
cdx=None,
)
@@ -523,7 +527,7 @@ class SavePageNowClient:
hit=True,
status="success",
terminal_url=cdx_row.url,
- terminal_dt=cdx_row.status_code,
+ terminal_dt=cdx_row.datetime,
terminal_status_code=cdx_row.status_code,
body=body,
cdx=cdx_partial,
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 58d3517..f085fc5 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -4,185 +4,222 @@ import json
import base64
import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
+from collections import namedtuple
-from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, SavePageNowRemoteError, CdxApiError
-krom sandcrawler.grobid import GrobidClient
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError
+from sandcrawler.grobid import GrobidClient
from sandcrawler.misc import gen_file_metadata
from sandcrawler.html import extract_fulltext_url
from sandcrawler.workers import SandcrawlerWorker
class IngestFileWorker(SandcrawlerWorker):
+ """
+ High level flow is to look in history first, then go to live web if
+ resource not found. Following redirects is treated as "fetching a
+ resource". Current version fetches a single resource; if it isn't a hit
+ but is an HTML 200, treats it as a landing page, tries to extract
+ fulltext link, then fetches that resource.
+
+ process(request) -> response
+ Does all the things!
+
+ Check existing processing (short circuit):
+
+ check_existing_ingest(base_url) -> ingest_file_result or none
+ process_existing(result) -> response
+ try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit()
+
+ Fetch resource:
+
+ find_resource(url) -> ResourceResult
+
+ Process resource:
+
+ process_hit(ResourceResult) -> response
+ process_grobid(ResourceResult)
+ """
def __init__(self, sink=None, **kwargs):
super().__init__()
self.sink = sink
- self.spn_client = kwargs.get('spn_client',
- SavePageNowClient())
- self.wayback_client = kwargs.get('wayback_client',
- WaybackClient())
- self.cdx_client = kwargs.get('cdx_client',
- CdxApiClient())
- self.grobid_client = kwargs.get('grobid_client',
- GrobidClient())
+ self.wayback_client = kwargs.get('wayback_client')
+ if not self.wayback_client:
+ self.wayback_client = WaybackClient()
+ self.spn_client = kwargs.get('spn_client')
+ if not self.spn_client:
+ self.spn_client = SavePageNowClient()
+ self.grobid_client = kwargs.get('grobid_client')
+ if not self.grobid_client:
+ self.grobid_client = GrobidClient()
+
+ self.attempt_existing = False
+
+ def check_existing_ingest(self, base_url):
+ """
+ Check in sandcrawler-db (postgres) to see if we have already ingested
+ this URL (ingest file result table).
+ Returns existing row *if* found *and* we should use it, otherwise None.
- def get_cdx_and_body(self, url):
- """
- Returns a CDX dict and body as a tuple.
-
- If there isn't an existing wayback capture, take one now. Raises an
- exception if can't capture, or if CDX API not available.
-
- Raises an exception if can't find/fetch.
-
- TODO:
- - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively
+ Looks at existing ingest results and makes a decision based on, eg,
+ status and timestamp.
"""
-
- WAYBACK_ENDPOINT = "https://web.archive.org/web/"
-
- cdx = self.cdx_client.lookup_latest(url, follow_redirects=True)
- if not cdx:
- cdx_list = self.spn_client.save_url_now_v2(url)
- for cdx_url in cdx_list:
- if 'pdf.sciencedirectassets.com' in cdx_url and '.pdf' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'osapublishing.org' in cdx_url and 'abstract.cfm' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'pubs.acs.org' in cdx_url and '/doi/pdf/' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'ieeexplore.ieee.org' in cdx_url and '.pdf' in cdx_url and 'arnumber=' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'hrmars.com' in cdx_url and 'journals/papers' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if not cdx:
- # extraction didn't work as expected; fetch whatever SPN2 got
- cdx = self.cdx_client.lookup_latest(url, follow_redirects=True)
- if not cdx:
- print("{}".format(cdx_list), file=sys.stderr)
- raise SavePageNowError("Failed to find terminal capture from SPNv2")
+ if not self.attempt_existing:
+ return None
+ raise NotImplementedError
- try:
- resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url'])
- except requests.exceptions.TooManyRedirects as e:
- raise WaybackError("Redirect loop fetching from wayback (dt: {}, url: {})".format(cdx['datetime'], cdx['url']))
- if resp.status_code != cdx['http_status']:
- raise WaybackError("Got unexpected wayback status (expected {} from CDX, got {})".format(cdx['http_status'], resp.status_code))
- body = resp.content
- return (cdx, body)
+ # this "return True" is just here to make pylint happy
+ return True
- def process(self, request):
+ def find_resource(self, url, best_mimetype=None):
+ """
+ Looks in wayback for a resource starting at the URL, following any
+ redirects. If a hit isn't found, try crawling with SPN.
+ """
+ resource = self.wayback_client.lookup_resource(url, best_mimetype)
+ if not resource or not resource.hit:
+ resource = self.spn_client.crawl_resource(url, self.wayback_client)
+ print("[FETCH] {} url:{}".format(
+ resource.status,
+ url),
+ file=sys.stderr)
+ return resource
+
+ def process_existing(self, request, result_row):
+ """
+ If we have an existing ingest file result, do any database fetches or
+ additional processing necessary to return a result.
"""
- 1. check sandcrawler-db for base_url
- -> if found, populate terminal+wayback fields
- 2. check CDX for base_url (only 200, past year)
- -> if found, populate terminal+wayback fields
- 3. if we have wayback, fetch that. otherwise do recursive SPN crawl
- -> populate terminal+wayback
- 4. calculate file_meta
- -> populate file_meta
- 5. check sandcrawler-db for GROBID XML
- 6. run GROBID if we didn't already
- -> push results to minio+sandcrawler-db
- 7. decide if this was a hit
-
- In all cases, print JSON status, and maybe push to sandcrawler-db
+ result = {
+ 'hit': result_row.hit,
+ 'status': result_row.status,
+ 'request': request,
+ }
+ # TODO: fetch file_meta
+ # TODO: fetch grobid
+ return result
+
+ def process_hit(self, resource, file_meta):
"""
+ Run all the necessary processing for a new/fresh ingest hit.
+ """
+ return {
+ 'grobid': self.process_grobid(resource),
+ }
+
+ def process_grobid(self, resource):
+ """
+ Submits to resource body to GROBID for processing.
+
+ TODO: By default checks sandcrawler-db for an existing row first, then
+ decide if we should re-process
+
+ TODO: Code to push to Kafka might also go here?
+ """
+ result = self.grobid_client.process_fulltext(resource.body)
+ if result['status'] == "success":
+ metadata = self.grobid_client.metadata(result)
+ if metadata:
+ result.update(metadata)
+ result.pop('tei_xml', None)
+ return result
+
+ def process(self, request):
+
+ # for now, only pdf ingest is implemented
+ assert request.get('ingest_type') == "pdf"
+ ingest_type = request.get('ingest_type')
+ base_url = request['base_url']
+
+ best_mimetype = None
+ if ingest_type == "pdf":
+ best_mimetype = "application/pdf"
+
+ existing = self.check_existing_ingest(base_url)
+ if existing:
+ return self.process_existing(request, existing)
+
+ result = dict(request=request, hit=False)
+
+ try:
+ # first hop
+ resource = self.find_resource(base_url, best_mimetype)
+ print("[{}] fetch status: {} url: {}".format(
+ resource.status,
+ ingest_type,
+ base_url),
+ file=sys.stderr)
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
+ file_meta = gen_file_metadata(resource.body)
+
+ if "html" in file_meta['mimetype']:
+ # got landing page, try another hop
+ fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
+
+ result['html'] = fulltext_url
+ if not fulltext_url or not 'pdf_url' in fulltext_url:
+ result['status'] = 'no-pdf-link'
+ return result
+ print("\tlanding page URL extracted ({}): {}".format(
+ fulltext_url.get('technique'),
+ fulltext_url['pdf_url'],
+ ),
+ file=sys.stderr)
+ resource = self.find_resource(fulltext_url['pdf_url'], best_mimetype)
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
+ file_meta = gen_file_metadata(resource.body)
+ except SavePageNowError as e:
+ result['status'] = 'spn-error'
+ result['error_message'] = str(e)
+ return result
+ except PetaboxError as e:
+ result['status'] = 'petabox-error'
+ result['error_message'] = str(e)
+ return result
+ except CdxApiError as e:
+ result['status'] = 'cdx-error'
+ result['error_message'] = str(e)
+ return result
+ except WaybackError as e:
+ result['status'] = 'wayback-error'
+ result['error_message'] = str(e)
+ return result
+
+ if resource.terminal_dt:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ }
+
+ # must be a hit if we got this far
+ assert resource.hit == True
+ assert resource.terminal_status_code == 200
+
+ result['file_meta'] = file_meta
+
+ # other failure cases
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
+ return result
+
+ if not (resource.hit and file_meta['mimetype'] == "application/pdf"):
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+
+ info = self.process_hit(resource, file_meta)
+ result.update(info)
+
+ result['status'] = "success"
+ result['hit'] = True
+ return result
- response = dict(request=request, hit=False)
- url = request['base_url']
- while url:
- try:
- (cdx_dict, body) = self.get_cdx_and_body(url)
- except SavePageNowRemoteError as e:
- response['status'] = 'spn-remote-error'
- response['error_message'] = str(e)
- return response
- except SavePageNowError as e:
- response['status'] = 'spn-error'
- response['error_message'] = str(e)
- return response
- except CdxApiError as e:
- response['status'] = 'cdx-error'
- response['error_message'] = str(e)
- return response
- except WaybackError as e:
- response['status'] = 'wayback-error'
- response['error_message'] = str(e)
- return response
- print("CDX hit: {}".format(cdx_dict), file=sys.stderr)
-
- response['cdx'] = cdx_dict
- # TODO: populate terminal
- response['terminal'] = dict(url=cdx_dict['url'], http_status=cdx_dict['http_status'])
- if not body:
- response['status'] = 'null-body'
- return response
- if cdx_dict['http_status'] != 200:
- response['status'] = 'terminal-bad-status'
- return response
- file_meta = gen_file_metadata(body)
- mimetype = cdx_dict['mimetype']
- if mimetype in ('warc/revisit', 'binary/octet-stream', 'application/octet-stream', 'application/x-download', 'application/force-download'):
- mimetype = file_meta['mimetype']
- response['file_meta'] = file_meta
- if 'html' in mimetype:
- page_metadata = extract_fulltext_url(response['cdx']['url'], body)
- if page_metadata and page_metadata.get('pdf_url'):
- next_url = page_metadata.get('pdf_url')
- if next_url == url:
- response['status'] = 'link-loop'
- return response
- url = next_url
- continue
- elif page_metadata and page_metadata.get('next_url'):
- next_url = page_metadata.get('next_url')
- if next_url == url:
- response['status'] = 'link-loop'
- return response
- url = next_url
- continue
- else:
- response['terminal']['html'] = page_metadata
- response['status'] = 'no-pdf-link'
- return response
- elif 'pdf' in mimetype:
- break
- else:
- response['status'] = 'other-mimetype'
- return response
-
- response['file_meta'] = file_meta
-
- # if we got here, we have a PDF
- sha1hex = response['file_meta']['sha1hex']
-
- # do GROBID
- response['grobid'] = self.grobid_client.process_fulltext(body)
- #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr)
-
- # TODO: optionally publish to Kafka here, but continue on failure (but
- # send a sentry exception?)
-
- # parse metadata, but drop fulltext from ingest response
- if response['grobid']['status'] == 'success':
- grobid_metadata = self.grobid_client.metadata(response['grobid'])
- if grobid_metadata:
- response['grobid'].update(grobid_metadata)
- response['grobid'].pop('tei_xml')
-
- # Ok, now what?
- #print("GOT TO END", file=sys.stderr)
- response['status'] = "success"
- response['hit'] = True
- return response
class IngestFileRequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py
index 8d15d70..2aafe7c 100644
--- a/python/tests/test_wayback.py
+++ b/python/tests/test_wayback.py
@@ -54,8 +54,8 @@ def test_cdx_fetch(cdx_client):
assert resp.datetime == CDX_DT
assert resp.url == CDX_TARGET
assert resp.sha1b32 == "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR"
- assert resp.warc_csize == "8445"
- assert resp.warc_offset == "108062304"
+ assert resp.warc_csize == 8445
+ assert resp.warc_offset == 108062304
assert resp.warc_path == "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
@responses.activate
@@ -96,7 +96,17 @@ def test_cdx_lookup_best(cdx_client):
assert resp.warc_path == CDX_SINGLE_HIT[1][-1]
WARC_TARGET = "http://fatcat.wiki/"
-WARC_BODY = b"<html>some stuff</html>"
+WARC_BODY = b"""
+<html>
+ <head>
+ <meta name="citation_pdf_url" content="http://www.example.com/content/271/20/11761.full.pdf">
+ </head>
+ <body>
+ <h1>my big article here</h1>
+ blah
+ </body>
+</html>
+"""
@pytest.fixture
def wayback_client(cdx_client, mocker):
@@ -116,7 +126,30 @@ def wayback_client(cdx_client, mocker):
return client
-def test_wayback_fetch(wayback_client, mocker):
+@pytest.fixture
+def wayback_client_pdf(cdx_client, mocker):
+
+ with open('tests/files/dummy.pdf', 'rb') as f:
+ pdf_bytes = f.read()
+
+ client = WaybackClient(
+ cdx_client=cdx_client,
+ petabox_webdata_secret="dummy-petabox-secret",
+ )
+ # mock out the wayback store with mock stuff
+ client.rstore = mocker.Mock()
+ resource = mocker.Mock()
+ client.rstore.load_resource = mocker.MagicMock(return_value=resource)
+ resource.get_status = mocker.MagicMock(return_value=[200])
+ resource.get_location = mocker.MagicMock(return_value=[WARC_TARGET])
+ body = mocker.Mock()
+ resource.open_raw_content = mocker.MagicMock(return_value=body)
+ body.read = mocker.MagicMock(return_value=pdf_bytes)
+
+ return client
+
+@responses.activate
+def test_wayback_fetch(wayback_client):
resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz")
assert resp.body == WARC_BODY
assert resp.location == WARC_TARGET
@@ -124,3 +157,14 @@ def test_wayback_fetch(wayback_client, mocker):
resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz")
assert resp == WARC_BODY
+@responses.activate
+def test_lookup_resource_success(wayback_client):
+
+ responses.add(responses.GET,
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_MULTI_HIT))
+
+ resp = wayback_client.lookup_resource(CDX_TARGET)
+
+ assert resp.hit == True