aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/ingest.py')
-rw-r--r--python/sandcrawler/ingest.py355
1 files changed, 196 insertions, 159 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 58d3517..f085fc5 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -4,185 +4,222 @@ import json
import base64
import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
+from collections import namedtuple
-from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, SavePageNowRemoteError, CdxApiError
-krom sandcrawler.grobid import GrobidClient
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError
+from sandcrawler.grobid import GrobidClient
from sandcrawler.misc import gen_file_metadata
from sandcrawler.html import extract_fulltext_url
from sandcrawler.workers import SandcrawlerWorker
class IngestFileWorker(SandcrawlerWorker):
+ """
+ High level flow is to look in history first, then go to live web if
+ resource not found. Following redirects is treated as "fetching a
+ resource". Current version fetches a single resource; if it isn't a hit
+ but is an HTML 200, treats it as a landing page, tries to extract
+ fulltext link, then fetches that resource.
+
+ process(request) -> response
+ Does all the things!
+
+ Check existing processing (short circuit):
+
+ check_existing_ingest(base_url) -> ingest_file_result or none
+ process_existing(result) -> response
+ try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit()
+
+ Fetch resource:
+
+ find_resource(url) -> ResourceResult
+
+ Process resource:
+
+ process_hit(ResourceResult) -> response
+ process_grobid(ResourceResult)
+ """
def __init__(self, sink=None, **kwargs):
super().__init__()
self.sink = sink
- self.spn_client = kwargs.get('spn_client',
- SavePageNowClient())
- self.wayback_client = kwargs.get('wayback_client',
- WaybackClient())
- self.cdx_client = kwargs.get('cdx_client',
- CdxApiClient())
- self.grobid_client = kwargs.get('grobid_client',
- GrobidClient())
+ self.wayback_client = kwargs.get('wayback_client')
+ if not self.wayback_client:
+ self.wayback_client = WaybackClient()
+ self.spn_client = kwargs.get('spn_client')
+ if not self.spn_client:
+ self.spn_client = SavePageNowClient()
+ self.grobid_client = kwargs.get('grobid_client')
+ if not self.grobid_client:
+ self.grobid_client = GrobidClient()
+
+ self.attempt_existing = False
+
+ def check_existing_ingest(self, base_url):
+ """
+ Check in sandcrawler-db (postgres) to see if we have already ingested
+ this URL (ingest file result table).
+ Returns existing row *if* found *and* we should use it, otherwise None.
- def get_cdx_and_body(self, url):
- """
- Returns a CDX dict and body as a tuple.
-
- If there isn't an existing wayback capture, take one now. Raises an
- exception if can't capture, or if CDX API not available.
-
- Raises an exception if can't find/fetch.
-
- TODO:
- - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively
+ Looks at existing ingest results and makes a decision based on, eg,
+ status and timestamp.
"""
-
- WAYBACK_ENDPOINT = "https://web.archive.org/web/"
-
- cdx = self.cdx_client.lookup_latest(url, follow_redirects=True)
- if not cdx:
- cdx_list = self.spn_client.save_url_now_v2(url)
- for cdx_url in cdx_list:
- if 'pdf.sciencedirectassets.com' in cdx_url and '.pdf' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'osapublishing.org' in cdx_url and 'abstract.cfm' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'pubs.acs.org' in cdx_url and '/doi/pdf/' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'ieeexplore.ieee.org' in cdx_url and '.pdf' in cdx_url and 'arnumber=' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if 'hrmars.com' in cdx_url and 'journals/papers' in cdx_url:
- cdx = self.cdx_client.lookup_latest(cdx_url)
- break
- if not cdx:
- # extraction didn't work as expected; fetch whatever SPN2 got
- cdx = self.cdx_client.lookup_latest(url, follow_redirects=True)
- if not cdx:
- print("{}".format(cdx_list), file=sys.stderr)
- raise SavePageNowError("Failed to find terminal capture from SPNv2")
+ if not self.attempt_existing:
+ return None
+ raise NotImplementedError
- try:
- resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url'])
- except requests.exceptions.TooManyRedirects as e:
- raise WaybackError("Redirect loop fetching from wayback (dt: {}, url: {})".format(cdx['datetime'], cdx['url']))
- if resp.status_code != cdx['http_status']:
- raise WaybackError("Got unexpected wayback status (expected {} from CDX, got {})".format(cdx['http_status'], resp.status_code))
- body = resp.content
- return (cdx, body)
+ # this "return True" is just here to make pylint happy
+ return True
- def process(self, request):
+ def find_resource(self, url, best_mimetype=None):
+ """
+ Looks in wayback for a resource starting at the URL, following any
+ redirects. If a hit isn't found, try crawling with SPN.
+ """
+ resource = self.wayback_client.lookup_resource(url, best_mimetype)
+ if not resource or not resource.hit:
+ resource = self.spn_client.crawl_resource(url, self.wayback_client)
+ print("[FETCH] {} url:{}".format(
+ resource.status,
+ url),
+ file=sys.stderr)
+ return resource
+
+ def process_existing(self, request, result_row):
+ """
+ If we have an existing ingest file result, do any database fetches or
+ additional processing necessary to return a result.
"""
- 1. check sandcrawler-db for base_url
- -> if found, populate terminal+wayback fields
- 2. check CDX for base_url (only 200, past year)
- -> if found, populate terminal+wayback fields
- 3. if we have wayback, fetch that. otherwise do recursive SPN crawl
- -> populate terminal+wayback
- 4. calculate file_meta
- -> populate file_meta
- 5. check sandcrawler-db for GROBID XML
- 6. run GROBID if we didn't already
- -> push results to minio+sandcrawler-db
- 7. decide if this was a hit
-
- In all cases, print JSON status, and maybe push to sandcrawler-db
+ result = {
+ 'hit': result_row.hit,
+ 'status': result_row.status,
+ 'request': request,
+ }
+ # TODO: fetch file_meta
+ # TODO: fetch grobid
+ return result
+
+ def process_hit(self, resource, file_meta):
"""
+ Run all the necessary processing for a new/fresh ingest hit.
+ """
+ return {
+ 'grobid': self.process_grobid(resource),
+ }
+
+ def process_grobid(self, resource):
+ """
+ Submits to resource body to GROBID for processing.
+
+ TODO: By default checks sandcrawler-db for an existing row first, then
+ decide if we should re-process
+
+ TODO: Code to push to Kafka might also go here?
+ """
+ result = self.grobid_client.process_fulltext(resource.body)
+ if result['status'] == "success":
+ metadata = self.grobid_client.metadata(result)
+ if metadata:
+ result.update(metadata)
+ result.pop('tei_xml', None)
+ return result
+
+ def process(self, request):
+
+ # for now, only pdf ingest is implemented
+ assert request.get('ingest_type') == "pdf"
+ ingest_type = request.get('ingest_type')
+ base_url = request['base_url']
+
+ best_mimetype = None
+ if ingest_type == "pdf":
+ best_mimetype = "application/pdf"
+
+ existing = self.check_existing_ingest(base_url)
+ if existing:
+ return self.process_existing(request, existing)
+
+ result = dict(request=request, hit=False)
+
+ try:
+ # first hop
+ resource = self.find_resource(base_url, best_mimetype)
+ print("[{}] fetch status: {} url: {}".format(
+ resource.status,
+ ingest_type,
+ base_url),
+ file=sys.stderr)
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
+ file_meta = gen_file_metadata(resource.body)
+
+ if "html" in file_meta['mimetype']:
+ # got landing page, try another hop
+ fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
+
+ result['html'] = fulltext_url
+ if not fulltext_url or not 'pdf_url' in fulltext_url:
+ result['status'] = 'no-pdf-link'
+ return result
+ print("\tlanding page URL extracted ({}): {}".format(
+ fulltext_url.get('technique'),
+ fulltext_url['pdf_url'],
+ ),
+ file=sys.stderr)
+ resource = self.find_resource(fulltext_url['pdf_url'], best_mimetype)
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
+ file_meta = gen_file_metadata(resource.body)
+ except SavePageNowError as e:
+ result['status'] = 'spn-error'
+ result['error_message'] = str(e)
+ return result
+ except PetaboxError as e:
+ result['status'] = 'petabox-error'
+ result['error_message'] = str(e)
+ return result
+ except CdxApiError as e:
+ result['status'] = 'cdx-error'
+ result['error_message'] = str(e)
+ return result
+ except WaybackError as e:
+ result['status'] = 'wayback-error'
+ result['error_message'] = str(e)
+ return result
+
+ if resource.terminal_dt:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ }
+
+ # must be a hit if we got this far
+ assert resource.hit == True
+ assert resource.terminal_status_code == 200
+
+ result['file_meta'] = file_meta
+
+ # other failure cases
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
+ return result
+
+ if not (resource.hit and file_meta['mimetype'] == "application/pdf"):
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+
+ info = self.process_hit(resource, file_meta)
+ result.update(info)
+
+ result['status'] = "success"
+ result['hit'] = True
+ return result
- response = dict(request=request, hit=False)
- url = request['base_url']
- while url:
- try:
- (cdx_dict, body) = self.get_cdx_and_body(url)
- except SavePageNowRemoteError as e:
- response['status'] = 'spn-remote-error'
- response['error_message'] = str(e)
- return response
- except SavePageNowError as e:
- response['status'] = 'spn-error'
- response['error_message'] = str(e)
- return response
- except CdxApiError as e:
- response['status'] = 'cdx-error'
- response['error_message'] = str(e)
- return response
- except WaybackError as e:
- response['status'] = 'wayback-error'
- response['error_message'] = str(e)
- return response
- print("CDX hit: {}".format(cdx_dict), file=sys.stderr)
-
- response['cdx'] = cdx_dict
- # TODO: populate terminal
- response['terminal'] = dict(url=cdx_dict['url'], http_status=cdx_dict['http_status'])
- if not body:
- response['status'] = 'null-body'
- return response
- if cdx_dict['http_status'] != 200:
- response['status'] = 'terminal-bad-status'
- return response
- file_meta = gen_file_metadata(body)
- mimetype = cdx_dict['mimetype']
- if mimetype in ('warc/revisit', 'binary/octet-stream', 'application/octet-stream', 'application/x-download', 'application/force-download'):
- mimetype = file_meta['mimetype']
- response['file_meta'] = file_meta
- if 'html' in mimetype:
- page_metadata = extract_fulltext_url(response['cdx']['url'], body)
- if page_metadata and page_metadata.get('pdf_url'):
- next_url = page_metadata.get('pdf_url')
- if next_url == url:
- response['status'] = 'link-loop'
- return response
- url = next_url
- continue
- elif page_metadata and page_metadata.get('next_url'):
- next_url = page_metadata.get('next_url')
- if next_url == url:
- response['status'] = 'link-loop'
- return response
- url = next_url
- continue
- else:
- response['terminal']['html'] = page_metadata
- response['status'] = 'no-pdf-link'
- return response
- elif 'pdf' in mimetype:
- break
- else:
- response['status'] = 'other-mimetype'
- return response
-
- response['file_meta'] = file_meta
-
- # if we got here, we have a PDF
- sha1hex = response['file_meta']['sha1hex']
-
- # do GROBID
- response['grobid'] = self.grobid_client.process_fulltext(body)
- #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr)
-
- # TODO: optionally publish to Kafka here, but continue on failure (but
- # send a sentry exception?)
-
- # parse metadata, but drop fulltext from ingest response
- if response['grobid']['status'] == 'success':
- grobid_metadata = self.grobid_client.metadata(response['grobid'])
- if grobid_metadata:
- response['grobid'].update(grobid_metadata)
- response['grobid'].pop('tei_xml')
-
- # Ok, now what?
- #print("GOT TO END", file=sys.stderr)
- response['status'] = "success"
- response['hit'] = True
- return response
class IngestFileRequestHandler(BaseHTTPRequestHandler):
def do_POST(self):