From 6454cdc93424b23f484fc56e1f9f986490d05c2b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 8 Jan 2020 19:30:38 -0800 Subject: wrap up basic (locally testable) ingest refactor --- python/sandcrawler/ia.py | 42 ++--- python/sandcrawler/ingest.py | 355 ++++++++++++++++++++++++------------------- python/tests/test_wayback.py | 52 ++++++- 3 files changed, 267 insertions(+), 182 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index ba9b5b9..ac0fef8 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -13,7 +13,7 @@ from http.client import IncompleteRead from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory -from .misc import b32_hex, requests_retry_session +from .misc import b32_hex, requests_retry_session, gen_file_metadata ResourceResult = namedtuple("ResourceResult", [ @@ -106,15 +106,15 @@ class CdxApiClient: status_code=int(raw[4]), sha1b32=raw[5], sha1hex=b32_hex(raw[5]), - warc_csize=raw[8], - warc_offset=raw[9], + warc_csize=int(raw[8]), + warc_offset=int(raw[9]), warc_path=raw[10], ) assert (row.mimetype == "-") or ("-" not in row) rows.append(row) return rows - def fetch(self, url, datetime): + def fetch(self, url, datetime, filter_status_code=None): """ Fetches a single CDX row by url/datetime. Raises a KeyError if not found, because we expect to be looking up a specific full record. @@ -127,9 +127,10 @@ class CdxApiClient: 'to': datetime, 'matchType': 'exact', 'limit': -1, - 'fastLatest': True, 'output': 'json', } + if filter_status_code: + params['filter'] = "statuscode:{}".format(filter_status_code) resp = self._query_api(params) if not resp: raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) @@ -148,9 +149,9 @@ class CdxApiClient: 'url': url, 'matchType': 'exact', 'limit': -25, - 'fastLatest': True, 'output': 'json', 'collapse': 'timestamp:6', + 'filter': '!mimetype:warc/revisit', } if max_age_days: since = datetime.date.today() - datetime.timedelta(days=max_age_days) @@ -165,9 +166,11 @@ class CdxApiClient: 200 mimetype match - most-recent + not-liveweb + most-recent no match - most-recent + not-liveweb + most-recent 3xx most-recent 4xx @@ -178,10 +181,11 @@ class CdxApiClient: This function will create a tuple that can be used to sort in *reverse* order. """ return ( - r.status_code == 200, - 0 - r.status_code, - r.mimetype == best_mimetype, - r.datetime, + int(r.status_code == 200), + int(0 - r.status_code), + int(r.mimetype == best_mimetype), + int('/' in r.warc_path), + int(r.datetime), ) rows = sorted(rows, key=cdx_sort_key) @@ -251,7 +255,7 @@ class WaybackClient: # whole cluster is down though. status_code = gwb_record.get_status()[0] - location = gwb_record.get_location()[0] + location = (gwb_record.get_location() or [None])[0] body = None if status_code == 200: @@ -280,7 +284,7 @@ class WaybackClient: return resource.body - def fetch_replay(self, url, datetime): + def fetch_replay_body(self, url, datetime): """ Fetches an HTTP 200 record from wayback via the replay interface (web.archive.org) instead of petabox. @@ -327,8 +331,8 @@ class WaybackClient: body=None, cdx=None, ) - if cdx.status_code == 200: - body = self.fetch_petabox_body(cdx.warc_csize, cdx.warc_offset, cdx_row.warc_path) + if cdx_row.status_code == 200: + body = self.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path) return ResourceResult( start_url=start_url, hit=True, @@ -360,7 +364,7 @@ class WaybackClient: return ResourceResult( start_url=start_url, hit=False, - status="terminal-not-success", + status="terminal-bad-status", terminal_url=cdx_row.url, terminal_dt=cdx_row.datetime, terminal_status_code=cdx_row.status_code, @@ -506,7 +510,7 @@ class SavePageNowClient: status=spn_result.status, terminal_url=spn_result.terminal_url, terminal_dt=spn_result.terminal_dt, - terminal_status_code=spn_result.terminal_status_code, + terminal_status_code=None, body=None, cdx=None, ) @@ -523,7 +527,7 @@ class SavePageNowClient: hit=True, status="success", terminal_url=cdx_row.url, - terminal_dt=cdx_row.status_code, + terminal_dt=cdx_row.datetime, terminal_status_code=cdx_row.status_code, body=body, cdx=cdx_partial, diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 58d3517..f085fc5 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -4,185 +4,222 @@ import json import base64 import requests from http.server import BaseHTTPRequestHandler, HTTPServer +from collections import namedtuple -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, SavePageNowRemoteError, CdxApiError -krom sandcrawler.grobid import GrobidClient +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError +from sandcrawler.grobid import GrobidClient from sandcrawler.misc import gen_file_metadata from sandcrawler.html import extract_fulltext_url from sandcrawler.workers import SandcrawlerWorker class IngestFileWorker(SandcrawlerWorker): + """ + High level flow is to look in history first, then go to live web if + resource not found. Following redirects is treated as "fetching a + resource". Current version fetches a single resource; if it isn't a hit + but is an HTML 200, treats it as a landing page, tries to extract + fulltext link, then fetches that resource. + + process(request) -> response + Does all the things! + + Check existing processing (short circuit): + + check_existing_ingest(base_url) -> ingest_file_result or none + process_existing(result) -> response + try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() + + Fetch resource: + + find_resource(url) -> ResourceResult + + Process resource: + + process_hit(ResourceResult) -> response + process_grobid(ResourceResult) + """ def __init__(self, sink=None, **kwargs): super().__init__() self.sink = sink - self.spn_client = kwargs.get('spn_client', - SavePageNowClient()) - self.wayback_client = kwargs.get('wayback_client', - WaybackClient()) - self.cdx_client = kwargs.get('cdx_client', - CdxApiClient()) - self.grobid_client = kwargs.get('grobid_client', - GrobidClient()) + self.wayback_client = kwargs.get('wayback_client') + if not self.wayback_client: + self.wayback_client = WaybackClient() + self.spn_client = kwargs.get('spn_client') + if not self.spn_client: + self.spn_client = SavePageNowClient() + self.grobid_client = kwargs.get('grobid_client') + if not self.grobid_client: + self.grobid_client = GrobidClient() + + self.attempt_existing = False + + def check_existing_ingest(self, base_url): + """ + Check in sandcrawler-db (postgres) to see if we have already ingested + this URL (ingest file result table). + Returns existing row *if* found *and* we should use it, otherwise None. - def get_cdx_and_body(self, url): - """ - Returns a CDX dict and body as a tuple. - - If there isn't an existing wayback capture, take one now. Raises an - exception if can't capture, or if CDX API not available. - - Raises an exception if can't find/fetch. - - TODO: - - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively + Looks at existing ingest results and makes a decision based on, eg, + status and timestamp. """ - - WAYBACK_ENDPOINT = "https://web.archive.org/web/" - - cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) - if not cdx: - cdx_list = self.spn_client.save_url_now_v2(url) - for cdx_url in cdx_list: - if 'pdf.sciencedirectassets.com' in cdx_url and '.pdf' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'osapublishing.org' in cdx_url and 'abstract.cfm' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'pubs.acs.org' in cdx_url and '/doi/pdf/' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'ieeexplore.ieee.org' in cdx_url and '.pdf' in cdx_url and 'arnumber=' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'hrmars.com' in cdx_url and 'journals/papers' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if not cdx: - # extraction didn't work as expected; fetch whatever SPN2 got - cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) - if not cdx: - print("{}".format(cdx_list), file=sys.stderr) - raise SavePageNowError("Failed to find terminal capture from SPNv2") + if not self.attempt_existing: + return None + raise NotImplementedError - try: - resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) - except requests.exceptions.TooManyRedirects as e: - raise WaybackError("Redirect loop fetching from wayback (dt: {}, url: {})".format(cdx['datetime'], cdx['url'])) - if resp.status_code != cdx['http_status']: - raise WaybackError("Got unexpected wayback status (expected {} from CDX, got {})".format(cdx['http_status'], resp.status_code)) - body = resp.content - return (cdx, body) + # this "return True" is just here to make pylint happy + return True - def process(self, request): + def find_resource(self, url, best_mimetype=None): + """ + Looks in wayback for a resource starting at the URL, following any + redirects. If a hit isn't found, try crawling with SPN. + """ + resource = self.wayback_client.lookup_resource(url, best_mimetype) + if not resource or not resource.hit: + resource = self.spn_client.crawl_resource(url, self.wayback_client) + print("[FETCH] {} url:{}".format( + resource.status, + url), + file=sys.stderr) + return resource + + def process_existing(self, request, result_row): + """ + If we have an existing ingest file result, do any database fetches or + additional processing necessary to return a result. """ - 1. check sandcrawler-db for base_url - -> if found, populate terminal+wayback fields - 2. check CDX for base_url (only 200, past year) - -> if found, populate terminal+wayback fields - 3. if we have wayback, fetch that. otherwise do recursive SPN crawl - -> populate terminal+wayback - 4. calculate file_meta - -> populate file_meta - 5. check sandcrawler-db for GROBID XML - 6. run GROBID if we didn't already - -> push results to minio+sandcrawler-db - 7. decide if this was a hit - - In all cases, print JSON status, and maybe push to sandcrawler-db + result = { + 'hit': result_row.hit, + 'status': result_row.status, + 'request': request, + } + # TODO: fetch file_meta + # TODO: fetch grobid + return result + + def process_hit(self, resource, file_meta): """ + Run all the necessary processing for a new/fresh ingest hit. + """ + return { + 'grobid': self.process_grobid(resource), + } + + def process_grobid(self, resource): + """ + Submits to resource body to GROBID for processing. + + TODO: By default checks sandcrawler-db for an existing row first, then + decide if we should re-process + + TODO: Code to push to Kafka might also go here? + """ + result = self.grobid_client.process_fulltext(resource.body) + if result['status'] == "success": + metadata = self.grobid_client.metadata(result) + if metadata: + result.update(metadata) + result.pop('tei_xml', None) + return result + + def process(self, request): + + # for now, only pdf ingest is implemented + assert request.get('ingest_type') == "pdf" + ingest_type = request.get('ingest_type') + base_url = request['base_url'] + + best_mimetype = None + if ingest_type == "pdf": + best_mimetype = "application/pdf" + + existing = self.check_existing_ingest(base_url) + if existing: + return self.process_existing(request, existing) + + result = dict(request=request, hit=False) + + try: + # first hop + resource = self.find_resource(base_url, best_mimetype) + print("[{}] fetch status: {} url: {}".format( + resource.status, + ingest_type, + base_url), + file=sys.stderr) + if not resource.hit: + result['status'] = resource.status + return result + file_meta = gen_file_metadata(resource.body) + + if "html" in file_meta['mimetype']: + # got landing page, try another hop + fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) + + result['html'] = fulltext_url + if not fulltext_url or not 'pdf_url' in fulltext_url: + result['status'] = 'no-pdf-link' + return result + print("\tlanding page URL extracted ({}): {}".format( + fulltext_url.get('technique'), + fulltext_url['pdf_url'], + ), + file=sys.stderr) + resource = self.find_resource(fulltext_url['pdf_url'], best_mimetype) + if not resource.hit: + result['status'] = resource.status + return result + file_meta = gen_file_metadata(resource.body) + except SavePageNowError as e: + result['status'] = 'spn-error' + result['error_message'] = str(e) + return result + except PetaboxError as e: + result['status'] = 'petabox-error' + result['error_message'] = str(e) + return result + except CdxApiError as e: + result['status'] = 'cdx-error' + result['error_message'] = str(e) + return result + except WaybackError as e: + result['status'] = 'wayback-error' + result['error_message'] = str(e) + return result + + if resource.terminal_dt: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + + # must be a hit if we got this far + assert resource.hit == True + assert resource.terminal_status_code == 200 + + result['file_meta'] = file_meta + + # other failure cases + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + if not (resource.hit and file_meta['mimetype'] == "application/pdf"): + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + + info = self.process_hit(resource, file_meta) + result.update(info) + + result['status'] = "success" + result['hit'] = True + return result - response = dict(request=request, hit=False) - url = request['base_url'] - while url: - try: - (cdx_dict, body) = self.get_cdx_and_body(url) - except SavePageNowRemoteError as e: - response['status'] = 'spn-remote-error' - response['error_message'] = str(e) - return response - except SavePageNowError as e: - response['status'] = 'spn-error' - response['error_message'] = str(e) - return response - except CdxApiError as e: - response['status'] = 'cdx-error' - response['error_message'] = str(e) - return response - except WaybackError as e: - response['status'] = 'wayback-error' - response['error_message'] = str(e) - return response - print("CDX hit: {}".format(cdx_dict), file=sys.stderr) - - response['cdx'] = cdx_dict - # TODO: populate terminal - response['terminal'] = dict(url=cdx_dict['url'], http_status=cdx_dict['http_status']) - if not body: - response['status'] = 'null-body' - return response - if cdx_dict['http_status'] != 200: - response['status'] = 'terminal-bad-status' - return response - file_meta = gen_file_metadata(body) - mimetype = cdx_dict['mimetype'] - if mimetype in ('warc/revisit', 'binary/octet-stream', 'application/octet-stream', 'application/x-download', 'application/force-download'): - mimetype = file_meta['mimetype'] - response['file_meta'] = file_meta - if 'html' in mimetype: - page_metadata = extract_fulltext_url(response['cdx']['url'], body) - if page_metadata and page_metadata.get('pdf_url'): - next_url = page_metadata.get('pdf_url') - if next_url == url: - response['status'] = 'link-loop' - return response - url = next_url - continue - elif page_metadata and page_metadata.get('next_url'): - next_url = page_metadata.get('next_url') - if next_url == url: - response['status'] = 'link-loop' - return response - url = next_url - continue - else: - response['terminal']['html'] = page_metadata - response['status'] = 'no-pdf-link' - return response - elif 'pdf' in mimetype: - break - else: - response['status'] = 'other-mimetype' - return response - - response['file_meta'] = file_meta - - # if we got here, we have a PDF - sha1hex = response['file_meta']['sha1hex'] - - # do GROBID - response['grobid'] = self.grobid_client.process_fulltext(body) - #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr) - - # TODO: optionally publish to Kafka here, but continue on failure (but - # send a sentry exception?) - - # parse metadata, but drop fulltext from ingest response - if response['grobid']['status'] == 'success': - grobid_metadata = self.grobid_client.metadata(response['grobid']) - if grobid_metadata: - response['grobid'].update(grobid_metadata) - response['grobid'].pop('tei_xml') - - # Ok, now what? - #print("GOT TO END", file=sys.stderr) - response['status'] = "success" - response['hit'] = True - return response class IngestFileRequestHandler(BaseHTTPRequestHandler): def do_POST(self): diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py index 8d15d70..2aafe7c 100644 --- a/python/tests/test_wayback.py +++ b/python/tests/test_wayback.py @@ -54,8 +54,8 @@ def test_cdx_fetch(cdx_client): assert resp.datetime == CDX_DT assert resp.url == CDX_TARGET assert resp.sha1b32 == "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR" - assert resp.warc_csize == "8445" - assert resp.warc_offset == "108062304" + assert resp.warc_csize == 8445 + assert resp.warc_offset == 108062304 assert resp.warc_path == "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" @responses.activate @@ -96,7 +96,17 @@ def test_cdx_lookup_best(cdx_client): assert resp.warc_path == CDX_SINGLE_HIT[1][-1] WARC_TARGET = "http://fatcat.wiki/" -WARC_BODY = b"some stuff" +WARC_BODY = b""" + + + + + +

my big article here

+ blah + + +""" @pytest.fixture def wayback_client(cdx_client, mocker): @@ -116,7 +126,30 @@ def wayback_client(cdx_client, mocker): return client -def test_wayback_fetch(wayback_client, mocker): +@pytest.fixture +def wayback_client_pdf(cdx_client, mocker): + + with open('tests/files/dummy.pdf', 'rb') as f: + pdf_bytes = f.read() + + client = WaybackClient( + cdx_client=cdx_client, + petabox_webdata_secret="dummy-petabox-secret", + ) + # mock out the wayback store with mock stuff + client.rstore = mocker.Mock() + resource = mocker.Mock() + client.rstore.load_resource = mocker.MagicMock(return_value=resource) + resource.get_status = mocker.MagicMock(return_value=[200]) + resource.get_location = mocker.MagicMock(return_value=[WARC_TARGET]) + body = mocker.Mock() + resource.open_raw_content = mocker.MagicMock(return_value=body) + body.read = mocker.MagicMock(return_value=pdf_bytes) + + return client + +@responses.activate +def test_wayback_fetch(wayback_client): resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz") assert resp.body == WARC_BODY assert resp.location == WARC_TARGET @@ -124,3 +157,14 @@ def test_wayback_fetch(wayback_client, mocker): resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz") assert resp == WARC_BODY +@responses.activate +def test_lookup_resource_success(wayback_client): + + responses.add(responses.GET, + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_MULTI_HIT)) + + resp = wayback_client.lookup_resource(CDX_TARGET) + + assert resp.hit == True -- cgit v1.2.3