diff options
Diffstat (limited to 'python/ingest_file.py')
-rwxr-xr-x | python/ingest_file.py | 324 |
1 files changed, 4 insertions, 320 deletions
diff --git a/python/ingest_file.py b/python/ingest_file.py index 4daa472..40eee4d 100755 --- a/python/ingest_file.py +++ b/python/ingest_file.py @@ -1,314 +1,12 @@ #!/usr/bin/env python3 -""" -IngestRequest - - ingest_type - - base_url - - release_stage - - release_id - - ext_ids - - doi - - pmcid - - ... - - expect_mimetypes - - project/source (?) - - expect_sha1 - -FileIngestResult - - request (object) - - terminal - - url - - status_code - - wayback - - datetime - - archive_url - - file_meta - - size_bytes - - md5 - - sha1 - - sha256 - - mimetype - - grobid - - version - - status_code - - xml_url - - release_id - - status (slug) - - hit (boolean) - -Simplified process, assuming totally new URL and PDF file: - -- crawl via SPN (including redirects, extraction) - => terminal - => wayback -- calculate file metadata - => file_meta -- run GROBID - => grobid - -Optimizations: - -- sandcrawler-db lookup of base_url: terminal+wayback -- GWB CDX lookup of base_url: terminal+wayback -- sandcrawler-db lookup of GROBID: grobid - -New "ingest" table? -- base_url (indexed) -- datetime -- terminal_status -- terminal_url -- terminal_sha1 -- hit - -""" - import sys import json -import base64 -import hashlib import argparse -import datetime -import requests -from http.server import BaseHTTPRequestHandler, HTTPServer - -from grobid2json import teixml2json - - -GROBID_ENDPOINT = "http://grobid.qa.fatcat.wiki" - -class CDXApiError(Exception): - pass - -class WaybackError(Exception): - pass - -class SavePageNowError(Exception): - pass - -class SandcrawlerDB: - - def __init__(self, **kwargs): - self.api_uri = kwargs.get('api_url', - "http://aitio.us.archive.org:3030") - - def get_cdx(self, url): - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) - resp.raise_for_status() - return resp.json() or None - - def get_grobid(self, sha1): - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) - resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] - else: - return None - - def get_file_meta(self, sha1): - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) - resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] - else: - return None - -def b32_hex(s): - s = s.strip().split()[0].lower() - if s.startswith("sha1:"): - s = s[5:] - if len(s) != 32: - return s - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - - -def cdx_api_lookup(url): - """ - Returns a CDX dict, or None if not found. - """ - CDX_API_ENDPOINT = "https://web.archive.org/cdx/search/cdx" - - resp = requests.get(CDX_API_ENDPOINT, params={ - 'url': url, - 'matchType': 'exact', - 'limit': -1, - 'filter': 'statuscode:200', - 'output': 'json', - }) - if resp.status_code != 200: - raise CDXApiError(resp.text) - rj = resp.json() - if len(rj) <= 1: - return None - cdx = rj[1] - assert len(cdx) == 7 # JSON is short - cdx = dict( - surt=cdx[0], - datetime=cdx[1], - url=cdx[2], - mimetype=cdx[3], - status_code=int(cdx[4]), - sha1b32=cdx[5], - sha1hex=b32_hex(cdx[5]), - ) - return cdx - -def parse_html(body): - raise NotImplementedError() -def save_url_now(url): - """ - Tries to "save page now" - """ +from http.server import HTTPServer +from sandcrawler.ingest import IngestFileRequestHandler, FileIngester - SPN_ENDPOINT = "https://web.archive.org/save/" - resp = requests.get(SPN_ENDPOINT + url) - if resp.status_code != 200: - raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url)) - cdx = cdx_api_lookup(url) - body = resp.content - return (cdx, body) - -def get_cdx_and_body(url): - """ - Returns a CDX dict and body as a tuple. - - If there isn't an existing wayback capture, take one now. Raises an - exception if can't capture, or if CDX API not available. - - Raises an exception if can't find/fetch. - - TODO: - - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively - """ - - WAYBACK_ENDPOINT = "https://web.archive.org/web/" - - cdx = cdx_api_lookup(url) - if not cdx: - return save_url_now(url) - - resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) - if resp.status_code != 200: - raise WaybackError(resp.text) - body = resp.content - return (cdx, body) - -def file_metadata(blob): - """ - Returns a dict: size_bytes, md5, sha1, sha256 - """ - hashes = [ - hashlib.sha1(), - hashlib.sha256(), - hashlib.md5(), - ] - for h in hashes: - h.update(blob) - return dict( - size_bytes=len(blob), - sha1=hashes[0].hexdigest(), - sha256=hashes[1].hexdigest(), - md5=hashes[2].hexdigest(), - ) - - -def do_grobid(sha1hex, blob): - grobid_response = requests.post( - GROBID_ENDPOINT + "/api/processFulltextDocument", - files={'input': blob, 'consolidateHeader': '2'}, - ) - - info = dict( - sha1hex=sha1hex, - status_code=grobid_response.status_code, - ) - # 4 MByte XML size limit; don't record GROBID status on this path - if len(grobid_response.content) > 4000000: - info['status'] = 'oversize' - return info - if grobid_response.status_code != 200: - # response.text is .content decoded as utf-8 - info['status'] = 'error' - info['error_msg'] = grobid_response.text[:10000] - dict(status='error', description=grobid_response.text) - return info, dict(status="error", reason="non-200 GROBID HTTP status", - extra=grobid_response.text) - else: - info['status'] = 'success' - - metadata = teixml2json(grobid_response.text, encumbered=False) - year = None - mdate = metadata.get('date') - if mdate and len(mdate) >= 4: - year = int(mdate[0:4]) - info['metadata'] = dict( - title=metadata.get('title'), - authors=metadata.get('authors'), - journal=metadata.get('journal'), - year=metadata.get('year'), - # TODO: any other biblio-glutton fields? first-page, volume - ) - info['version'] = metadata.get('grobid_version') - info['timestamp'] = metadata.get('grobid_timestamp') - info['glutton_fatcat'] = metadata.get('fatcat_release') - # TODO: push to kafka - return info - -def ingest_file(request): - """ - 1. check sandcrawler-db for base_url - -> if found, populate terminal+wayback fields - 2. check CDX for base_url (only 200, past year) - -> if found, populate terminal+wayback fields - 3. if we have wayback, fetch that. otherwise do recursive SPN crawl - -> populate terminal+wayback - 4. calculate file_meta - -> populate file_meta - 5. check sandcrawler-db for GROBID XML - 6. run GROBID if we didn't already - -> push results to minio+sandcrawler-db - 7. decide if this was a hit - - In all cases, print JSON status, and maybe push to sandcrawler-db - """ - - response = dict(request=request) - url = request['base_url'] - while url: - (cdx_dict, body) = get_cdx_and_body(url) - sys.stderr.write("CDX hit: {}\n".format(cdx_dict)) - - response['cdx'] = cdx_dict - response['terminal'] = dict() - if 'html' in cdx_dict['mimetype']: - page_metadata = parse_html(body) - if page_metadata.get('pdf_url'): - url = page_metadata.get('pdf_url') - continue - response['terminal']['html'] = page_metadata - response['status'] = 'no-pdf-link' - return response - elif 'pdf' in cdx_dict['mimetype']: - break - else: - response['status'] = 'other-mimetype' - return response - - # if we got here, we have a PDF - response['file_meta'] = file_metadata(body) - sha1hex = response['file_meta']['sha1'] - - # do GROBID - response['grobid'] = do_grobid(sha1hex, body) - sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status'])) - - # Ok, now what? - sys.stderr.write("GOT TO END\n") - response['status'] = "success" - response['hit'] = True - return response def run_single_ingest(args): request = dict( @@ -316,7 +14,8 @@ def run_single_ingest(args): ext_ids=dict(doi=args.doi), release_id=args.release_id, ) - result = ingest_file(request) + ingester = FileIngester() + result = ingester.ingest_file(request) print(json.dumps(result)) return result @@ -326,21 +25,6 @@ def run_requests(args): result = ingest_file(request) print(json.dumps(result)) -class IngestFileRequestHandler(BaseHTTPRequestHandler): - def do_POST(self): - if self.path != "/ingest": - self.send_response(404) - self.end_headers() - self.wfile.write("404: Not Found") - return - length = int(self.headers.get('content-length')) - request = json.loads(self.rfile.read(length).decode('utf-8')) - print("Got request: {}".format(request)) - result = ingest_file(request) - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps(result)) - def run_api(args): port = 8083 print("Listening on localhost:{}".format(port)) |