From 2e3611f0e66615ae007d4e46bb5905e2220fb690 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 22 Oct 2019 21:34:40 -0700 Subject: much progress on file ingest path --- python/ingest_file.py | 324 +------------------------------------------ python/sandcrawler/grobid.py | 14 ++ python/sandcrawler/html.py | 73 ++++++++++ python/sandcrawler/ia.py | 88 ++++++++++-- python/sandcrawler/ingest.py | 150 ++++++++++++++++++++ python/sandcrawler/misc.py | 24 ++++ 6 files changed, 338 insertions(+), 335 deletions(-) create mode 100644 python/sandcrawler/html.py create mode 100644 python/sandcrawler/ingest.py diff --git a/python/ingest_file.py b/python/ingest_file.py index 4daa472..40eee4d 100755 --- a/python/ingest_file.py +++ b/python/ingest_file.py @@ -1,314 +1,12 @@ #!/usr/bin/env python3 -""" -IngestRequest - - ingest_type - - base_url - - release_stage - - release_id - - ext_ids - - doi - - pmcid - - ... - - expect_mimetypes - - project/source (?) - - expect_sha1 - -FileIngestResult - - request (object) - - terminal - - url - - status_code - - wayback - - datetime - - archive_url - - file_meta - - size_bytes - - md5 - - sha1 - - sha256 - - mimetype - - grobid - - version - - status_code - - xml_url - - release_id - - status (slug) - - hit (boolean) - -Simplified process, assuming totally new URL and PDF file: - -- crawl via SPN (including redirects, extraction) - => terminal - => wayback -- calculate file metadata - => file_meta -- run GROBID - => grobid - -Optimizations: - -- sandcrawler-db lookup of base_url: terminal+wayback -- GWB CDX lookup of base_url: terminal+wayback -- sandcrawler-db lookup of GROBID: grobid - -New "ingest" table? -- base_url (indexed) -- datetime -- terminal_status -- terminal_url -- terminal_sha1 -- hit - -""" - import sys import json -import base64 -import hashlib import argparse -import datetime -import requests -from http.server import BaseHTTPRequestHandler, HTTPServer - -from grobid2json import teixml2json - - -GROBID_ENDPOINT = "http://grobid.qa.fatcat.wiki" - -class CDXApiError(Exception): - pass - -class WaybackError(Exception): - pass - -class SavePageNowError(Exception): - pass - -class SandcrawlerDB: - - def __init__(self, **kwargs): - self.api_uri = kwargs.get('api_url', - "http://aitio.us.archive.org:3030") - - def get_cdx(self, url): - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) - resp.raise_for_status() - return resp.json() or None - - def get_grobid(self, sha1): - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) - resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] - else: - return None - - def get_file_meta(self, sha1): - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) - resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] - else: - return None - -def b32_hex(s): - s = s.strip().split()[0].lower() - if s.startswith("sha1:"): - s = s[5:] - if len(s) != 32: - return s - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - - -def cdx_api_lookup(url): - """ - Returns a CDX dict, or None if not found. - """ - CDX_API_ENDPOINT = "https://web.archive.org/cdx/search/cdx" - - resp = requests.get(CDX_API_ENDPOINT, params={ - 'url': url, - 'matchType': 'exact', - 'limit': -1, - 'filter': 'statuscode:200', - 'output': 'json', - }) - if resp.status_code != 200: - raise CDXApiError(resp.text) - rj = resp.json() - if len(rj) <= 1: - return None - cdx = rj[1] - assert len(cdx) == 7 # JSON is short - cdx = dict( - surt=cdx[0], - datetime=cdx[1], - url=cdx[2], - mimetype=cdx[3], - status_code=int(cdx[4]), - sha1b32=cdx[5], - sha1hex=b32_hex(cdx[5]), - ) - return cdx - -def parse_html(body): - raise NotImplementedError() -def save_url_now(url): - """ - Tries to "save page now" - """ +from http.server import HTTPServer +from sandcrawler.ingest import IngestFileRequestHandler, FileIngester - SPN_ENDPOINT = "https://web.archive.org/save/" - resp = requests.get(SPN_ENDPOINT + url) - if resp.status_code != 200: - raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url)) - cdx = cdx_api_lookup(url) - body = resp.content - return (cdx, body) - -def get_cdx_and_body(url): - """ - Returns a CDX dict and body as a tuple. - - If there isn't an existing wayback capture, take one now. Raises an - exception if can't capture, or if CDX API not available. - - Raises an exception if can't find/fetch. - - TODO: - - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively - """ - - WAYBACK_ENDPOINT = "https://web.archive.org/web/" - - cdx = cdx_api_lookup(url) - if not cdx: - return save_url_now(url) - - resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) - if resp.status_code != 200: - raise WaybackError(resp.text) - body = resp.content - return (cdx, body) - -def file_metadata(blob): - """ - Returns a dict: size_bytes, md5, sha1, sha256 - """ - hashes = [ - hashlib.sha1(), - hashlib.sha256(), - hashlib.md5(), - ] - for h in hashes: - h.update(blob) - return dict( - size_bytes=len(blob), - sha1=hashes[0].hexdigest(), - sha256=hashes[1].hexdigest(), - md5=hashes[2].hexdigest(), - ) - - -def do_grobid(sha1hex, blob): - grobid_response = requests.post( - GROBID_ENDPOINT + "/api/processFulltextDocument", - files={'input': blob, 'consolidateHeader': '2'}, - ) - - info = dict( - sha1hex=sha1hex, - status_code=grobid_response.status_code, - ) - # 4 MByte XML size limit; don't record GROBID status on this path - if len(grobid_response.content) > 4000000: - info['status'] = 'oversize' - return info - if grobid_response.status_code != 200: - # response.text is .content decoded as utf-8 - info['status'] = 'error' - info['error_msg'] = grobid_response.text[:10000] - dict(status='error', description=grobid_response.text) - return info, dict(status="error", reason="non-200 GROBID HTTP status", - extra=grobid_response.text) - else: - info['status'] = 'success' - - metadata = teixml2json(grobid_response.text, encumbered=False) - year = None - mdate = metadata.get('date') - if mdate and len(mdate) >= 4: - year = int(mdate[0:4]) - info['metadata'] = dict( - title=metadata.get('title'), - authors=metadata.get('authors'), - journal=metadata.get('journal'), - year=metadata.get('year'), - # TODO: any other biblio-glutton fields? first-page, volume - ) - info['version'] = metadata.get('grobid_version') - info['timestamp'] = metadata.get('grobid_timestamp') - info['glutton_fatcat'] = metadata.get('fatcat_release') - # TODO: push to kafka - return info - -def ingest_file(request): - """ - 1. check sandcrawler-db for base_url - -> if found, populate terminal+wayback fields - 2. check CDX for base_url (only 200, past year) - -> if found, populate terminal+wayback fields - 3. if we have wayback, fetch that. otherwise do recursive SPN crawl - -> populate terminal+wayback - 4. calculate file_meta - -> populate file_meta - 5. check sandcrawler-db for GROBID XML - 6. run GROBID if we didn't already - -> push results to minio+sandcrawler-db - 7. decide if this was a hit - - In all cases, print JSON status, and maybe push to sandcrawler-db - """ - - response = dict(request=request) - url = request['base_url'] - while url: - (cdx_dict, body) = get_cdx_and_body(url) - sys.stderr.write("CDX hit: {}\n".format(cdx_dict)) - - response['cdx'] = cdx_dict - response['terminal'] = dict() - if 'html' in cdx_dict['mimetype']: - page_metadata = parse_html(body) - if page_metadata.get('pdf_url'): - url = page_metadata.get('pdf_url') - continue - response['terminal']['html'] = page_metadata - response['status'] = 'no-pdf-link' - return response - elif 'pdf' in cdx_dict['mimetype']: - break - else: - response['status'] = 'other-mimetype' - return response - - # if we got here, we have a PDF - response['file_meta'] = file_metadata(body) - sha1hex = response['file_meta']['sha1'] - - # do GROBID - response['grobid'] = do_grobid(sha1hex, body) - sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status'])) - - # Ok, now what? - sys.stderr.write("GOT TO END\n") - response['status'] = "success" - response['hit'] = True - return response def run_single_ingest(args): request = dict( @@ -316,7 +14,8 @@ def run_single_ingest(args): ext_ids=dict(doi=args.doi), release_id=args.release_id, ) - result = ingest_file(request) + ingester = FileIngester() + result = ingester.ingest_file(request) print(json.dumps(result)) return result @@ -326,21 +25,6 @@ def run_requests(args): result = ingest_file(request) print(json.dumps(result)) -class IngestFileRequestHandler(BaseHTTPRequestHandler): - def do_POST(self): - if self.path != "/ingest": - self.send_response(404) - self.end_headers() - self.wfile.write("404: Not Found") - return - length = int(self.headers.get('content-length')) - request = json.loads(self.rfile.read(length).decode('utf-8')) - print("Got request: {}".format(request)) - result = ingest_file(request) - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps(result)) - def run_api(args): port = 8083 print("Listening on localhost:{}".format(port)) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index f157241..d83fedc 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -2,6 +2,7 @@ import requests from collections import Counter +from grobid2json import teixml2json from .workers import SandcrawlerWorker from .misc import gen_file_metadata from .ia import WaybackClient, WaybackError @@ -49,6 +50,19 @@ class GrobidClient(object): info['error_msg'] = grobid_response.text[:10000] return info + def metadata(self, result): + if result['status'] != 'success': + return None + tei_json = teixml2json(result['tei_xml'], encumbered=False) + meta = dict() + biblio = dict() + for k in ('title', 'authors', 'journal', 'date', 'doi', ): + biblio[k] = tei_json.get(k) + meta['biblio'] = biblio + for k in ('grobid_version', 'grobid_timestamp', 'fatcat_release', 'language_code'): + meta[k] = tei_json.get(k) + return meta + class GrobidWorker(SandcrawlerWorker): def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs): diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py new file mode 100644 index 0000000..3191b66 --- /dev/null +++ b/python/sandcrawler/html.py @@ -0,0 +1,73 @@ + +import re +import sys +import urllib.parse + +from bs4 import BeautifulSoup + +RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') + +def extract_fulltext_url(html_url, html_body): + """ + Takes an HTML document (and URL), assumed to be a landing page, and tries + to find a fulltext PDF url. + """ + + host_prefix = '/'.join(html_url.split('/')[:3]) + soup = BeautifulSoup(html_body, 'html.parser') + + ### General Tricks ### + + # highwire-style meta tag + meta = soup.find('meta', attrs={"name":"citation_pdf_url"}) + if not meta: + meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"}) + if meta: + url = meta['content'].strip() + if url.startswith('http'): + return dict(pdf_url=url, technique='citation_pdf_url') + else: + sys.stderr.write("malformed citation_pdf_url? {}\n".format(url)) + + # ACS (and probably others) like: + # https://pubs.acs.org/doi/10.1021/acs.estlett.9b00379 + # PDF (1 MB) + href = soup.find('a', attrs={"title":"PDF"}) + if href: + url = href['href'].strip() + if url.startswith('http'): + return dict(pdf_url=url, technique='href_title') + elif url.startswith('/'): + return dict(pdf_url=host_prefix+url, technique='href_title') + + ### Publisher/Platform Specific ### + + # eLife (elifesciences.org) + if '://elifesciences.org/articles/' in html_url: + anchor = soup.find("a", attrs={"data-download-type": "pdf-article"}) + if anchor: + url = anchor['href'].strip() + assert '.pdf' in url + return dict(pdf_url=url) + + # research square (researchsquare.com) + if 'researchsquare.com/article/' in html_url: + # JSON in body with a field like: + # "url":"https://assets.researchsquare.com/files/4a57970e-b002-4608-b507-b95967649483/v2/Manuscript.pdf" + m = RESEARCHSQUARE_REGEX.search(html_body.decode('utf-8')) + if m: + url = m.group(1) + assert len(url) < 1024 + return dict(release_stage="manuscript", pdf_url=url) + + # ehp.niehs.nih.gov + # + if '://linkinghub.elsevier.com/retrieve/pii/' in html_url: + redirect = soup.find("input", attrs={"name": "redirectURL"}) + if redirect: + url = redirect['value'].strip() + if 'sciencedirect.com' in url: + url = urllib.parse.unquote(url) + return dict(next_url=url) + + return dict() diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 365cf82..a772bd4 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -3,7 +3,7 @@ # in `wayback` library. Means we can't run pylint. # pylint: skip-file -import os, sys +import os, sys, time import requests import wayback.exception @@ -11,6 +11,8 @@ from http.client import IncompleteRead from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory +from .misc import b32_hex, requests_retry_session + class CdxApiError(Exception): pass @@ -19,7 +21,7 @@ class CdxApiClient: def __init__(self, host_url="https://web.archive.org/cdx/search/cdx"): self.host_url = host_url - def lookup_latest(self, url): + def lookup_latest(self, url, follow_redirects=False): """ Looks up most recent HTTP 200 record for the given URL. @@ -28,15 +30,20 @@ class CdxApiClient: XXX: should do authorized lookup using cookie to get all fields """ - resp = requests.get(self.host_url, params={ + params = { 'url': url, 'matchType': 'exact', 'limit': -1, - 'filter': 'statuscode:200', 'output': 'json', - }) - if resp.status_code != 200: - raise CDXApiError(resp.text) + } + if not follow_redirects: + params['filter'] = 'statuscode:200' + resp = requests.get(self.host_url, params=params) + if follow_redirects: + raise NotImplementedError + else: + if resp.status_code != 200: + raise CdxApiError(resp.text) rj = resp.json() if len(rj) <= 1: return None @@ -113,23 +120,74 @@ class SavePageNowError(Exception): class SavePageNowClient: - def __init__(self, cdx_client=None, endpoint="https://web.archive.org/save/"): + def __init__(self, cdx_client=None, + v1endpoint="https://web.archive.org/save/", + v2endpoint="https://web.archive.org/save"): if cdx_client: self.cdx_client = cdx_client else: self.cdx_client = CdxApiClient() - self.endpoint = endpoint + self.ia_access_key = os.environ.get('IA_ACCESS_KEY') + self.ia_secret_key = os.environ.get('IA_SECRET_KEY') + self.v1endpoint = v1endpoint + self.v2endpoint = v2endpoint + self.http_session = requests_retry_session(retries=5, backoff_factor=3) + self.http_session.headers.update({ + 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient', + }) + self.v2_session = requests_retry_session(retries=5, backoff_factor=3) + self.v2_session.headers.update({ + 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient', + 'Accept': 'application/json', + 'Authorization': 'LOW {}:{}'.format(self.ia_access_key, self.ia_secret_key), + }) - def save_url_now(self, url): + def save_url_now_v1(self, url): """ - Returns a tuple (cdx, blob) on success, or raises an error on non-success. - - XXX: handle redirects? + Returns a tuple (cdx, blob) on success of single fetch, or raises an + error on non-success. """ - resp = requests.get(self.endpoint + url) + resp = self.http_session.get(self.v1endpoint + url) if resp.status_code != 200: raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url)) + terminal_url = '/'.join(resp.url.split('/')[5:]) body = resp.content - cdx = self.cdx_client.lookup_latest(url) + cdx = self.cdx_client.lookup_latest(terminal_url) + if not cdx: + raise SavePageNowError("SPN was successful, but CDX lookup then failed") return (cdx, body) + def save_url_now_v2(self, url): + """ + Returns a list of cdx objects, or raises an error on non-success. + """ + if not (self.ia_access_key and self.ia_secret_key): + raise Exception("SPNv2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)") + resp = self.v2_session.post( + self.v2endpoint, + data={ + 'url': url, + 'capture_all': 1, + 'if_not_archived_within': '1d', + }, + ) + if resp.status_code != 200: + raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url)) + resp_json = resp.json() + assert resp_json + + # poll until complete + for i in range(90): + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, resp_json['job_id'])) + resp.raise_for_status() + status = resp.json()['status'] + if status == 'success': + resp = resp.json() + break + elif status == 'pending': + time.sleep(1.0) + else: + raise SavePageNowError("SPN2 status:{} url:{}".format(status, url)) + + return resp['resources'] + diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py new file mode 100644 index 0000000..2469df6 --- /dev/null +++ b/python/sandcrawler/ingest.py @@ -0,0 +1,150 @@ + +import sys +import json +import base64 +import requests +from http.server import BaseHTTPRequestHandler, HTTPServer + +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient +from sandcrawler.grobid import GrobidClient +from sandcrawler.misc import gen_file_metadata +from sandcrawler.html import extract_fulltext_url + +class FileIngester: + + def __init__(self, **kwargs): + + self.spn_client = kwargs.get('spn_client', + SavePageNowClient()) + self.wayback_client = kwargs.get('wayback_client', + WaybackClient()) + self.cdx_client = kwargs.get('cdx_client', + CdxApiClient()) + self.grobid_client = kwargs.get('grobid_client', + GrobidClient()) + + + def get_cdx_and_body(self, url): + """ + Returns a CDX dict and body as a tuple. + + If there isn't an existing wayback capture, take one now. Raises an + exception if can't capture, or if CDX API not available. + + Raises an exception if can't find/fetch. + + TODO: + - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively + """ + + WAYBACK_ENDPOINT = "https://web.archive.org/web/" + + cdx = self.cdx_client.lookup_latest(url) + if not cdx: + # sciencedirect.com (Elsevier) requires browser crawling (SPNv2) + if ('sciencedirect.com' in url and '.pdf' in url): + cdx_list = self.spn_client.save_url_now_v2(url) + for cdx_url in cdx_list: + if 'pdf.sciencedirectassets.com' in cdx_url and '.pdf' in cdx_url: + cdx = self.cdx_client.lookup_latest(cdx_url) + break + if 'osapublishing.org' in cdx_url and 'abstract.cfm' in cdx_url: + cdx = self.cdx_client.lookup_latest(cdx_url) + break + if not cdx: + raise Exception("Failed to crawl sciencedirect.com PDF URL") + else: + return self.spn_client.save_url_now_v1(url) + + resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) + if resp.status_code != 200: + raise WaybackError(resp.text) + body = resp.content + return (cdx, body) + + def ingest_file(self, request): + """ + 1. check sandcrawler-db for base_url + -> if found, populate terminal+wayback fields + 2. check CDX for base_url (only 200, past year) + -> if found, populate terminal+wayback fields + 3. if we have wayback, fetch that. otherwise do recursive SPN crawl + -> populate terminal+wayback + 4. calculate file_meta + -> populate file_meta + 5. check sandcrawler-db for GROBID XML + 6. run GROBID if we didn't already + -> push results to minio+sandcrawler-db + 7. decide if this was a hit + + In all cases, print JSON status, and maybe push to sandcrawler-db + """ + + response = dict(request=request) + url = request['base_url'] + while url: + (cdx_dict, body) = self.get_cdx_and_body(url) + sys.stderr.write("CDX hit: {}\n".format(cdx_dict)) + + response['cdx'] = cdx_dict + response['terminal'] = dict() + file_meta = gen_file_metadata(body) + mimetype = cdx_dict['mimetype'] + if mimetype in ('warc/revisit', 'binary/octet-stream', 'application/octet-stream'): + mimetype = file_meta['mimetype'] + if 'html' in mimetype: + page_metadata = extract_fulltext_url(response['cdx']['url'], body) + if page_metadata and page_metadata.get('pdf_url'): + url = page_metadata.get('pdf_url') + continue + elif page_metadata and page_metadata.get('next_url'): + url = page_metadata.get('next_url') + continue + else: + response['terminal']['html'] = page_metadata + response['status'] = 'no-pdf-link' + return response + elif 'pdf' in mimetype: + response['file_meta'] = file_meta + break + else: + response['status'] = 'other-mimetype' + return response + + # if we got here, we have a PDF + sha1hex = response['file_meta']['sha1hex'] + + # do GROBID + response['grobid'] = self.grobid_client.process_fulltext(body) + sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status'])) + + # TODO: optionally publish to Kafka here, but continue on failure (but + # send a sentry exception?) + + # parse metadata, but drop fulltext from ingest response + if response['grobid']['status'] == 'success': + grobid_metadata = self.grobid_client.metadata(response['grobid']) + if grobid_metadata: + response['grobid'].update(grobid_metadata) + response['grobid'].pop('tei_xml') + + # Ok, now what? + sys.stderr.write("GOT TO END\n") + response['status'] = "success" + response['hit'] = True + return response + +class IngestFileRequestHandler(BaseHTTPRequestHandler): + def do_POST(self): + if self.path != "/ingest": + self.send_response(404) + self.end_headers() + self.wfile.write("404: Not Found") + return + length = int(self.headers.get('content-length')) + request = json.loads(self.rfile.read(length).decode('utf-8')) + print("Got request: {}".format(request)) + result = ingest_file(request) + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(result)) diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py index 4ffc5d7..5713199 100644 --- a/python/sandcrawler/misc.py +++ b/python/sandcrawler/misc.py @@ -3,6 +3,10 @@ import base64 import magic import hashlib import datetime +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error + def gen_file_metadata(blob): """ @@ -131,3 +135,23 @@ def parse_cdx_datetime(dt_str): return datetime.strptime(dt_str, "%Y%m%d%H%M%S") except Exception: return None + + +def requests_retry_session(retries=10, backoff_factor=3, + status_forcelist=(500, 502, 504), session=None): + """ + From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests + """ + session = session or requests.Session() + retry = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + -- cgit v1.2.3