diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/ingest_file.py | 386 |
1 files changed, 386 insertions, 0 deletions
diff --git a/python/ingest_file.py b/python/ingest_file.py new file mode 100755 index 0000000..0699a0c --- /dev/null +++ b/python/ingest_file.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 + +""" +IngestRequest + - ingest_type + - base_url + - release_stage + - release_id + - ext_ids + - doi + - pmcid + - ... + - expect_mimetypes + - project/source (?) + - expect_sha1 + +FileIngestResult + - request (object) + - terminal + - url + - status_code + - wayback + - datetime + - archive_url + - file_meta + - size_bytes + - md5 + - sha1 + - sha256 + - mimetype + - grobid + - version + - status_code + - xml_url + - release_id + - status (slug) + - hit (boolean) + +Simplified process, assuming totally new URL and PDF file: + +- crawl via SPN (including redirects, extraction) + => terminal + => wayback +- calculate file metadata + => file_meta +- run GROBID + => grobid + +Optimizations: + +- sandcrawler-db lookup of base_url: terminal+wayback +- GWB CDX lookup of base_url: terminal+wayback +- sandcrawler-db lookup of GROBID: grobid + +New "ingest" table? +- base_url (indexed) +- datetime +- terminal_status +- terminal_url +- terminal_sha1 +- hit + +""" + +import sys +import json +import base64 +import hashlib +import argparse +import datetime +import requests +from http.server import BaseHTTPRequestHandler, HTTPServer + +from grobid2json import teixml2json + + +GROBID_ENDPOINT = "http://grobid.qa.fatcat.wiki" + +class CDXApiError(Exception): + pass + +class WaybackError(Exception): + pass + +class SavePageNowError(Exception): + pass + +class SandcrawlerDB: + + def __init__(self, **kwargs): + self.api_uri = kwargs.get('api_url', + "http://aitio.us.archive.org:3030") + + def get_cdx(self, url): + resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + resp.raise_for_status() + return resp.json() or None + + def get_grobid(self, sha1): + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_file_meta(self, sha1): + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + +def b32_hex(s): + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + return s + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +def cdx_api_lookup(url): + """ + Returns a CDX dict, or None if not found. + """ + CDX_API_ENDPOINT = "https://web.archive.org/cdx/search/cdx" + + resp = requests.get(CDX_API_ENDPOINT, params={ + 'url': url, + 'matchType': 'exact', + 'limit': -1, + 'filter': 'statuscode:200', + 'output': 'json', + }) + if resp.status_code != 200: + raise CDXApiError(resp.text) + rj = resp.json() + if len(rj) <= 1: + return None + cdx = rj[1] + assert len(cdx) == 7 # JSON is short + cdx = dict( + surt=cdx[0], + datetime=cdx[1], + url=cdx[2], + mimetype=cdx[3], + status_code=int(cdx[4]), + sha1b32=cdx[5], + sha1hex=b32_hex(cdx[5]), + ) + return cdx + +def parse_html(body): + raise NotImplementedError() + +def save_url_now(url): + """ + Tries to "save page now" + """ + + SPN_ENDPOINT = "https://web.archive.org/save/" + resp = requests.get(SPN_ENDPOINT + url) + if resp.status_code != 200: + raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url)) + cdx = cdx_api_lookup(url) + body = resp.content + return (cdx, body) + +def get_cdx_and_body(url): + """ + Returns a CDX dict and body as a tuple. + + If there isn't an existing wayback capture, take one now. Raises an + exception if can't capture, or if CDX API not available. + + Raises an exception if can't find/fetch. + + TODO: + - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively + """ + + WAYBACK_ENDPOINT = "https://web.archive.org/web/" + + cdx = cdx_api_lookup(url) + if not cdx: + return save_url_now(url) + + resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) + if resp.status_code != 200: + raise WaybackError(resp.text) + body = resp.content + return (cdx, body) + +def file_metadata(blob): + """ + Returns a dict: size_bytes, md5, sha1, sha256 + """ + hashes = [ + hashlib.sha1(), + hashlib.sha256(), + hashlib.md5(), + ] + for h in hashes: + h.update(blob) + return dict( + size_bytes=len(blob), + sha1=hashes[0].hexdigest(), + sha256=hashes[1].hexdigest(), + md5=hashes[2].hexdigest(), + ) + + +def do_grobid(sha1hex, blob): + grobid_response = requests.post( + GROBID_ENDPOINT + "/api/processFulltextDocument", + files={'input': blob, 'consolidateHeader': '1'}, + ) + + info = dict( + sha1hex=sha1hex, + status_code=grobid_response.status_code, + ) + # 4 MByte XML size limit; don't record GROBID status on this path + if len(grobid_response.content) > 4000000: + info['status'] = 'oversize' + return info + if grobid_response.status_code != 200: + # response.text is .content decoded as utf-8 + info['status'] = 'error' + info['error_msg'] = grobid_response.text[:10000] + dict(status='error', description=grobid_response.text) + return info, dict(status="error", reason="non-200 GROBID HTTP status", + extra=grobid_response.text) + else: + info['status'] = 'success' + + metadata = teixml2json(grobid_response.text, encumbered=False) + year = None + mdate = metadata.get('date') + if mdate and len(mdate) >= 4: + year = int(mdate[0:4]) + info['metadata'] = dict( + title=metadata.get('title'), + authors=metadata.get('authors'), + journal=metadata.get('journal'), + year=metadata.get('year'), + # TODO: any other biblio-glutton fields? first-page, volume + ) + info['version'] = metadata.get('grobid_version') + info['timestamp'] = metadata.get('grobid_timestamp') + info['glutton_fatcat'] = metadata.get('fatcat_release') + # TODO: push to kafka + return info + +def ingest_file(request): + """ + 1. check sandcrawler-db for base_url + -> if found, populate terminal+wayback fields + 2. check CDX for base_url (only 200, past year) + -> if found, populate terminal+wayback fields + 3. if we have wayback, fetch that. otherwise do recursive SPN crawl + -> populate terminal+wayback + 4. calculate file_meta + -> populate file_meta + 5. check sandcrawler-db for GROBID XML + 6. run GROBID if we didn't already + -> push results to minio+sandcrawler-db + 7. decide if this was a hit + + In all cases, print JSON status, and maybe push to sandcrawler-db + """ + + response = dict(request=request) + url = request['base_url'] + while url: + (cdx_dict, body) = get_cdx_and_body(url) + sys.stderr.write("CDX hit: {}\n".format(cdx_dict)) + + response['cdx'] = cdx_dict + response['terminal'] = dict() + if 'html' in cdx_dict['mimetype']: + page_metadata = parse_html(body) + if page_metadata.get('pdf_url'): + url = page_metadata.get('pdf_url') + continue + response['terminal']['html'] = page_metadata + response['status'] = 'no-pdf-link' + return response + elif 'pdf' in cdx_dict['mimetype']: + break + else: + response['status'] = 'other-mimetype' + return response + + # if we got here, we have a PDF + response['file_meta'] = file_metadata(body) + sha1hex = response['file_meta']['sha1'] + + # do GROBID + response['grobid'] = do_grobid(sha1hex, body) + sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status'])) + + # Ok, now what? + sys.stderr.write("GOT TO END\n") + response['status'] = "success" + response['hit'] = True + return response + +def run_single_ingest(args): + request = dict( + base_url=args.url, + ext_ids=dict(doi=args.doi), + release_id=args.release_id, + ) + result = ingest_file(request) + print(json.dumps(result)) + return result + +def run_requests(args): + for l in args.json_file: + request = json.loads(l.strip()) + result = ingest_file(request) + print(json.dumps(result)) + +class IngestFileRequestHandler(BaseHTTPRequestHandler): + def do_POST(self): + if self.path != "/ingest": + self.send_response(404) + self.end_headers() + self.wfile.write("404: Not Found") + return + length = int(self.headers.get('content-length')) + request = json.loads(self.rfile.read(length).decode('utf-8')) + print("Got request: {}".format(request)) + result = ingest_file(request) + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(result)) + +def run_api(args): + port = 8083 + print("Listening on localhost:{}".format(port)) + server = HTTPServer(('', port), IngestFileRequestHandler) + server.serve_forever() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--api-host-url', + default="http://localhost:9411/v0", + help="fatcat API host/port to use") + subparsers = parser.add_subparsers() + + sub_single= subparsers.add_parser('single') + sub_single.set_defaults(func=run_single_ingest) + sub_single.add_argument('--release-id', + help="(optional) existing release ident to match to") + sub_single.add_argument('--doi', + help="(optional) existing release DOI to match to") + sub_single.add_argument('url', + help="URL of paper to fetch") + + sub_requests = subparsers.add_parser('requests') + sub_requests.set_defaults(func=run_requests) + sub_requests.add_argument('json_file', + help="JSON file (request per line) to import from (or stdin)", + default=sys.stdin, type=argparse.FileType('r')) + + sub_api = subparsers.add_parser('api') + sub_api.set_defaults(func=run_api) + sub_api.add_argument('--port', + help="HTTP port to listen on", + default=8033, type=int) + + args = parser.parse_args() + if not args.__dict__.get("func"): + sys.stderr.write("tell me what to do!\n") + sys.exit(-1) + + args.func(args) + +if __name__ == '__main__': + main() |