aboutsummaryrefslogtreecommitdiffstats
path: root/python/ingest_file.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-10-22 21:34:40 -0700
committerBryan Newbold <bnewbold@archive.org>2019-10-22 21:35:00 -0700
commit2e3611f0e66615ae007d4e46bb5905e2220fb690 (patch)
tree9d2fa6f8d62145a5ab31f37f26b6c293a2163acd /python/ingest_file.py
parentb11fe8c8f444756ae246250cbbfe44e7dc62eac3 (diff)
downloadsandcrawler-2e3611f0e66615ae007d4e46bb5905e2220fb690.tar.gz
sandcrawler-2e3611f0e66615ae007d4e46bb5905e2220fb690.zip
much progress on file ingest path
Diffstat (limited to 'python/ingest_file.py')
-rwxr-xr-xpython/ingest_file.py324
1 files changed, 4 insertions, 320 deletions
diff --git a/python/ingest_file.py b/python/ingest_file.py
index 4daa472..40eee4d 100755
--- a/python/ingest_file.py
+++ b/python/ingest_file.py
@@ -1,314 +1,12 @@
#!/usr/bin/env python3
-"""
-IngestRequest
- - ingest_type
- - base_url
- - release_stage
- - release_id
- - ext_ids
- - doi
- - pmcid
- - ...
- - expect_mimetypes
- - project/source (?)
- - expect_sha1
-
-FileIngestResult
- - request (object)
- - terminal
- - url
- - status_code
- - wayback
- - datetime
- - archive_url
- - file_meta
- - size_bytes
- - md5
- - sha1
- - sha256
- - mimetype
- - grobid
- - version
- - status_code
- - xml_url
- - release_id
- - status (slug)
- - hit (boolean)
-
-Simplified process, assuming totally new URL and PDF file:
-
-- crawl via SPN (including redirects, extraction)
- => terminal
- => wayback
-- calculate file metadata
- => file_meta
-- run GROBID
- => grobid
-
-Optimizations:
-
-- sandcrawler-db lookup of base_url: terminal+wayback
-- GWB CDX lookup of base_url: terminal+wayback
-- sandcrawler-db lookup of GROBID: grobid
-
-New "ingest" table?
-- base_url (indexed)
-- datetime
-- terminal_status
-- terminal_url
-- terminal_sha1
-- hit
-
-"""
-
import sys
import json
-import base64
-import hashlib
import argparse
-import datetime
-import requests
-from http.server import BaseHTTPRequestHandler, HTTPServer
-
-from grobid2json import teixml2json
-
-
-GROBID_ENDPOINT = "http://grobid.qa.fatcat.wiki"
-
-class CDXApiError(Exception):
- pass
-
-class WaybackError(Exception):
- pass
-
-class SavePageNowError(Exception):
- pass
-
-class SandcrawlerDB:
-
- def __init__(self, **kwargs):
- self.api_uri = kwargs.get('api_url',
- "http://aitio.us.archive.org:3030")
-
- def get_cdx(self, url):
- resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
- resp.raise_for_status()
- return resp.json() or None
-
- def get_grobid(self, sha1):
- resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
- resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
- else:
- return None
-
- def get_file_meta(self, sha1):
- resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
- resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
- else:
- return None
-
-def b32_hex(s):
- s = s.strip().split()[0].lower()
- if s.startswith("sha1:"):
- s = s[5:]
- if len(s) != 32:
- return s
- return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
-
-
-def cdx_api_lookup(url):
- """
- Returns a CDX dict, or None if not found.
- """
- CDX_API_ENDPOINT = "https://web.archive.org/cdx/search/cdx"
-
- resp = requests.get(CDX_API_ENDPOINT, params={
- 'url': url,
- 'matchType': 'exact',
- 'limit': -1,
- 'filter': 'statuscode:200',
- 'output': 'json',
- })
- if resp.status_code != 200:
- raise CDXApiError(resp.text)
- rj = resp.json()
- if len(rj) <= 1:
- return None
- cdx = rj[1]
- assert len(cdx) == 7 # JSON is short
- cdx = dict(
- surt=cdx[0],
- datetime=cdx[1],
- url=cdx[2],
- mimetype=cdx[3],
- status_code=int(cdx[4]),
- sha1b32=cdx[5],
- sha1hex=b32_hex(cdx[5]),
- )
- return cdx
-
-def parse_html(body):
- raise NotImplementedError()
-def save_url_now(url):
- """
- Tries to "save page now"
- """
+from http.server import HTTPServer
+from sandcrawler.ingest import IngestFileRequestHandler, FileIngester
- SPN_ENDPOINT = "https://web.archive.org/save/"
- resp = requests.get(SPN_ENDPOINT + url)
- if resp.status_code != 200:
- raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url))
- cdx = cdx_api_lookup(url)
- body = resp.content
- return (cdx, body)
-
-def get_cdx_and_body(url):
- """
- Returns a CDX dict and body as a tuple.
-
- If there isn't an existing wayback capture, take one now. Raises an
- exception if can't capture, or if CDX API not available.
-
- Raises an exception if can't find/fetch.
-
- TODO:
- - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively
- """
-
- WAYBACK_ENDPOINT = "https://web.archive.org/web/"
-
- cdx = cdx_api_lookup(url)
- if not cdx:
- return save_url_now(url)
-
- resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url'])
- if resp.status_code != 200:
- raise WaybackError(resp.text)
- body = resp.content
- return (cdx, body)
-
-def file_metadata(blob):
- """
- Returns a dict: size_bytes, md5, sha1, sha256
- """
- hashes = [
- hashlib.sha1(),
- hashlib.sha256(),
- hashlib.md5(),
- ]
- for h in hashes:
- h.update(blob)
- return dict(
- size_bytes=len(blob),
- sha1=hashes[0].hexdigest(),
- sha256=hashes[1].hexdigest(),
- md5=hashes[2].hexdigest(),
- )
-
-
-def do_grobid(sha1hex, blob):
- grobid_response = requests.post(
- GROBID_ENDPOINT + "/api/processFulltextDocument",
- files={'input': blob, 'consolidateHeader': '2'},
- )
-
- info = dict(
- sha1hex=sha1hex,
- status_code=grobid_response.status_code,
- )
- # 4 MByte XML size limit; don't record GROBID status on this path
- if len(grobid_response.content) > 4000000:
- info['status'] = 'oversize'
- return info
- if grobid_response.status_code != 200:
- # response.text is .content decoded as utf-8
- info['status'] = 'error'
- info['error_msg'] = grobid_response.text[:10000]
- dict(status='error', description=grobid_response.text)
- return info, dict(status="error", reason="non-200 GROBID HTTP status",
- extra=grobid_response.text)
- else:
- info['status'] = 'success'
-
- metadata = teixml2json(grobid_response.text, encumbered=False)
- year = None
- mdate = metadata.get('date')
- if mdate and len(mdate) >= 4:
- year = int(mdate[0:4])
- info['metadata'] = dict(
- title=metadata.get('title'),
- authors=metadata.get('authors'),
- journal=metadata.get('journal'),
- year=metadata.get('year'),
- # TODO: any other biblio-glutton fields? first-page, volume
- )
- info['version'] = metadata.get('grobid_version')
- info['timestamp'] = metadata.get('grobid_timestamp')
- info['glutton_fatcat'] = metadata.get('fatcat_release')
- # TODO: push to kafka
- return info
-
-def ingest_file(request):
- """
- 1. check sandcrawler-db for base_url
- -> if found, populate terminal+wayback fields
- 2. check CDX for base_url (only 200, past year)
- -> if found, populate terminal+wayback fields
- 3. if we have wayback, fetch that. otherwise do recursive SPN crawl
- -> populate terminal+wayback
- 4. calculate file_meta
- -> populate file_meta
- 5. check sandcrawler-db for GROBID XML
- 6. run GROBID if we didn't already
- -> push results to minio+sandcrawler-db
- 7. decide if this was a hit
-
- In all cases, print JSON status, and maybe push to sandcrawler-db
- """
-
- response = dict(request=request)
- url = request['base_url']
- while url:
- (cdx_dict, body) = get_cdx_and_body(url)
- sys.stderr.write("CDX hit: {}\n".format(cdx_dict))
-
- response['cdx'] = cdx_dict
- response['terminal'] = dict()
- if 'html' in cdx_dict['mimetype']:
- page_metadata = parse_html(body)
- if page_metadata.get('pdf_url'):
- url = page_metadata.get('pdf_url')
- continue
- response['terminal']['html'] = page_metadata
- response['status'] = 'no-pdf-link'
- return response
- elif 'pdf' in cdx_dict['mimetype']:
- break
- else:
- response['status'] = 'other-mimetype'
- return response
-
- # if we got here, we have a PDF
- response['file_meta'] = file_metadata(body)
- sha1hex = response['file_meta']['sha1']
-
- # do GROBID
- response['grobid'] = do_grobid(sha1hex, body)
- sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status']))
-
- # Ok, now what?
- sys.stderr.write("GOT TO END\n")
- response['status'] = "success"
- response['hit'] = True
- return response
def run_single_ingest(args):
request = dict(
@@ -316,7 +14,8 @@ def run_single_ingest(args):
ext_ids=dict(doi=args.doi),
release_id=args.release_id,
)
- result = ingest_file(request)
+ ingester = FileIngester()
+ result = ingester.ingest_file(request)
print(json.dumps(result))
return result
@@ -326,21 +25,6 @@ def run_requests(args):
result = ingest_file(request)
print(json.dumps(result))
-class IngestFileRequestHandler(BaseHTTPRequestHandler):
- def do_POST(self):
- if self.path != "/ingest":
- self.send_response(404)
- self.end_headers()
- self.wfile.write("404: Not Found")
- return
- length = int(self.headers.get('content-length'))
- request = json.loads(self.rfile.read(length).decode('utf-8'))
- print("Got request: {}".format(request))
- result = ingest_file(request)
- self.send_response(200)
- self.end_headers()
- self.wfile.write(json.dumps(result))
-
def run_api(args):
port = 8083
print("Listening on localhost:{}".format(port))