diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-01-08 19:30:38 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-09 16:31:40 -0800 |
commit | 6454cdc93424b23f484fc56e1f9f986490d05c2b (patch) | |
tree | 14e76469fdf9dfead292db1b26bd90b475feaf4e /python/sandcrawler/ingest.py | |
parent | 318bcf9dbc244a1130b74252b7842cc4eb954bfd (diff) | |
download | sandcrawler-6454cdc93424b23f484fc56e1f9f986490d05c2b.tar.gz sandcrawler-6454cdc93424b23f484fc56e1f9f986490d05c2b.zip |
wrap up basic (locally testable) ingest refactor
Diffstat (limited to 'python/sandcrawler/ingest.py')
-rw-r--r-- | python/sandcrawler/ingest.py | 355 |
1 files changed, 196 insertions, 159 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 58d3517..f085fc5 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -4,185 +4,222 @@ import json import base64 import requests from http.server import BaseHTTPRequestHandler, HTTPServer +from collections import namedtuple -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, SavePageNowRemoteError, CdxApiError -krom sandcrawler.grobid import GrobidClient +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError +from sandcrawler.grobid import GrobidClient from sandcrawler.misc import gen_file_metadata from sandcrawler.html import extract_fulltext_url from sandcrawler.workers import SandcrawlerWorker class IngestFileWorker(SandcrawlerWorker): + """ + High level flow is to look in history first, then go to live web if + resource not found. Following redirects is treated as "fetching a + resource". Current version fetches a single resource; if it isn't a hit + but is an HTML 200, treats it as a landing page, tries to extract + fulltext link, then fetches that resource. + + process(request) -> response + Does all the things! + + Check existing processing (short circuit): + + check_existing_ingest(base_url) -> ingest_file_result or none + process_existing(result) -> response + try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() + + Fetch resource: + + find_resource(url) -> ResourceResult + + Process resource: + + process_hit(ResourceResult) -> response + process_grobid(ResourceResult) + """ def __init__(self, sink=None, **kwargs): super().__init__() self.sink = sink - self.spn_client = kwargs.get('spn_client', - SavePageNowClient()) - self.wayback_client = kwargs.get('wayback_client', - WaybackClient()) - self.cdx_client = kwargs.get('cdx_client', - CdxApiClient()) - self.grobid_client = kwargs.get('grobid_client', - GrobidClient()) + self.wayback_client = kwargs.get('wayback_client') + if not self.wayback_client: + self.wayback_client = WaybackClient() + self.spn_client = kwargs.get('spn_client') + if not self.spn_client: + self.spn_client = SavePageNowClient() + self.grobid_client = kwargs.get('grobid_client') + if not self.grobid_client: + self.grobid_client = GrobidClient() + + self.attempt_existing = False + + def check_existing_ingest(self, base_url): + """ + Check in sandcrawler-db (postgres) to see if we have already ingested + this URL (ingest file result table). + Returns existing row *if* found *and* we should use it, otherwise None. - def get_cdx_and_body(self, url): - """ - Returns a CDX dict and body as a tuple. - - If there isn't an existing wayback capture, take one now. Raises an - exception if can't capture, or if CDX API not available. - - Raises an exception if can't find/fetch. - - TODO: - - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively + Looks at existing ingest results and makes a decision based on, eg, + status and timestamp. """ - - WAYBACK_ENDPOINT = "https://web.archive.org/web/" - - cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) - if not cdx: - cdx_list = self.spn_client.save_url_now_v2(url) - for cdx_url in cdx_list: - if 'pdf.sciencedirectassets.com' in cdx_url and '.pdf' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'osapublishing.org' in cdx_url and 'abstract.cfm' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'pubs.acs.org' in cdx_url and '/doi/pdf/' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'ieeexplore.ieee.org' in cdx_url and '.pdf' in cdx_url and 'arnumber=' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if 'hrmars.com' in cdx_url and 'journals/papers' in cdx_url: - cdx = self.cdx_client.lookup_latest(cdx_url) - break - if not cdx: - # extraction didn't work as expected; fetch whatever SPN2 got - cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) - if not cdx: - print("{}".format(cdx_list), file=sys.stderr) - raise SavePageNowError("Failed to find terminal capture from SPNv2") + if not self.attempt_existing: + return None + raise NotImplementedError - try: - resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) - except requests.exceptions.TooManyRedirects as e: - raise WaybackError("Redirect loop fetching from wayback (dt: {}, url: {})".format(cdx['datetime'], cdx['url'])) - if resp.status_code != cdx['http_status']: - raise WaybackError("Got unexpected wayback status (expected {} from CDX, got {})".format(cdx['http_status'], resp.status_code)) - body = resp.content - return (cdx, body) + # this "return True" is just here to make pylint happy + return True - def process(self, request): + def find_resource(self, url, best_mimetype=None): + """ + Looks in wayback for a resource starting at the URL, following any + redirects. If a hit isn't found, try crawling with SPN. + """ + resource = self.wayback_client.lookup_resource(url, best_mimetype) + if not resource or not resource.hit: + resource = self.spn_client.crawl_resource(url, self.wayback_client) + print("[FETCH] {} url:{}".format( + resource.status, + url), + file=sys.stderr) + return resource + + def process_existing(self, request, result_row): + """ + If we have an existing ingest file result, do any database fetches or + additional processing necessary to return a result. """ - 1. check sandcrawler-db for base_url - -> if found, populate terminal+wayback fields - 2. check CDX for base_url (only 200, past year) - -> if found, populate terminal+wayback fields - 3. if we have wayback, fetch that. otherwise do recursive SPN crawl - -> populate terminal+wayback - 4. calculate file_meta - -> populate file_meta - 5. check sandcrawler-db for GROBID XML - 6. run GROBID if we didn't already - -> push results to minio+sandcrawler-db - 7. decide if this was a hit - - In all cases, print JSON status, and maybe push to sandcrawler-db + result = { + 'hit': result_row.hit, + 'status': result_row.status, + 'request': request, + } + # TODO: fetch file_meta + # TODO: fetch grobid + return result + + def process_hit(self, resource, file_meta): """ + Run all the necessary processing for a new/fresh ingest hit. + """ + return { + 'grobid': self.process_grobid(resource), + } + + def process_grobid(self, resource): + """ + Submits to resource body to GROBID for processing. + + TODO: By default checks sandcrawler-db for an existing row first, then + decide if we should re-process + + TODO: Code to push to Kafka might also go here? + """ + result = self.grobid_client.process_fulltext(resource.body) + if result['status'] == "success": + metadata = self.grobid_client.metadata(result) + if metadata: + result.update(metadata) + result.pop('tei_xml', None) + return result + + def process(self, request): + + # for now, only pdf ingest is implemented + assert request.get('ingest_type') == "pdf" + ingest_type = request.get('ingest_type') + base_url = request['base_url'] + + best_mimetype = None + if ingest_type == "pdf": + best_mimetype = "application/pdf" + + existing = self.check_existing_ingest(base_url) + if existing: + return self.process_existing(request, existing) + + result = dict(request=request, hit=False) + + try: + # first hop + resource = self.find_resource(base_url, best_mimetype) + print("[{}] fetch status: {} url: {}".format( + resource.status, + ingest_type, + base_url), + file=sys.stderr) + if not resource.hit: + result['status'] = resource.status + return result + file_meta = gen_file_metadata(resource.body) + + if "html" in file_meta['mimetype']: + # got landing page, try another hop + fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) + + result['html'] = fulltext_url + if not fulltext_url or not 'pdf_url' in fulltext_url: + result['status'] = 'no-pdf-link' + return result + print("\tlanding page URL extracted ({}): {}".format( + fulltext_url.get('technique'), + fulltext_url['pdf_url'], + ), + file=sys.stderr) + resource = self.find_resource(fulltext_url['pdf_url'], best_mimetype) + if not resource.hit: + result['status'] = resource.status + return result + file_meta = gen_file_metadata(resource.body) + except SavePageNowError as e: + result['status'] = 'spn-error' + result['error_message'] = str(e) + return result + except PetaboxError as e: + result['status'] = 'petabox-error' + result['error_message'] = str(e) + return result + except CdxApiError as e: + result['status'] = 'cdx-error' + result['error_message'] = str(e) + return result + except WaybackError as e: + result['status'] = 'wayback-error' + result['error_message'] = str(e) + return result + + if resource.terminal_dt: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + + # must be a hit if we got this far + assert resource.hit == True + assert resource.terminal_status_code == 200 + + result['file_meta'] = file_meta + + # other failure cases + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + if not (resource.hit and file_meta['mimetype'] == "application/pdf"): + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + + info = self.process_hit(resource, file_meta) + result.update(info) + + result['status'] = "success" + result['hit'] = True + return result - response = dict(request=request, hit=False) - url = request['base_url'] - while url: - try: - (cdx_dict, body) = self.get_cdx_and_body(url) - except SavePageNowRemoteError as e: - response['status'] = 'spn-remote-error' - response['error_message'] = str(e) - return response - except SavePageNowError as e: - response['status'] = 'spn-error' - response['error_message'] = str(e) - return response - except CdxApiError as e: - response['status'] = 'cdx-error' - response['error_message'] = str(e) - return response - except WaybackError as e: - response['status'] = 'wayback-error' - response['error_message'] = str(e) - return response - print("CDX hit: {}".format(cdx_dict), file=sys.stderr) - - response['cdx'] = cdx_dict - # TODO: populate terminal - response['terminal'] = dict(url=cdx_dict['url'], http_status=cdx_dict['http_status']) - if not body: - response['status'] = 'null-body' - return response - if cdx_dict['http_status'] != 200: - response['status'] = 'terminal-bad-status' - return response - file_meta = gen_file_metadata(body) - mimetype = cdx_dict['mimetype'] - if mimetype in ('warc/revisit', 'binary/octet-stream', 'application/octet-stream', 'application/x-download', 'application/force-download'): - mimetype = file_meta['mimetype'] - response['file_meta'] = file_meta - if 'html' in mimetype: - page_metadata = extract_fulltext_url(response['cdx']['url'], body) - if page_metadata and page_metadata.get('pdf_url'): - next_url = page_metadata.get('pdf_url') - if next_url == url: - response['status'] = 'link-loop' - return response - url = next_url - continue - elif page_metadata and page_metadata.get('next_url'): - next_url = page_metadata.get('next_url') - if next_url == url: - response['status'] = 'link-loop' - return response - url = next_url - continue - else: - response['terminal']['html'] = page_metadata - response['status'] = 'no-pdf-link' - return response - elif 'pdf' in mimetype: - break - else: - response['status'] = 'other-mimetype' - return response - - response['file_meta'] = file_meta - - # if we got here, we have a PDF - sha1hex = response['file_meta']['sha1hex'] - - # do GROBID - response['grobid'] = self.grobid_client.process_fulltext(body) - #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr) - - # TODO: optionally publish to Kafka here, but continue on failure (but - # send a sentry exception?) - - # parse metadata, but drop fulltext from ingest response - if response['grobid']['status'] == 'success': - grobid_metadata = self.grobid_client.metadata(response['grobid']) - if grobid_metadata: - response['grobid'].update(grobid_metadata) - response['grobid'].pop('tei_xml') - - # Ok, now what? - #print("GOT TO END", file=sys.stderr) - response['status'] = "success" - response['hit'] = True - return response class IngestFileRequestHandler(BaseHTTPRequestHandler): def do_POST(self): |