diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/ingest.py | 240 |
1 files changed, 118 insertions, 122 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index a39d9ea..633e856 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -5,10 +5,11 @@ import gzip import time import base64 import requests +from typing import Optional, Tuple, Any, Dict from http.server import BaseHTTPRequestHandler, HTTPServer from collections import namedtuple -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding from sandcrawler.grobid import GrobidClient from sandcrawler.pdfextract import process_pdf, PdfExtractResult from sandcrawler.misc import gen_file_metadata, clean_url @@ -46,7 +47,7 @@ class IngestFileWorker(SandcrawlerWorker): def __init__(self, sink=None, **kwargs): super().__init__() - + self.sink = sink self.wayback_client = kwargs.get('wayback_client') if not self.wayback_client: @@ -63,6 +64,7 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_sink = kwargs.get('grobid_sink') self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') + self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) self.try_existing_grobid = kwargs.get('try_existing_grobid', True) @@ -137,7 +139,7 @@ class IngestFileWorker(SandcrawlerWorker): ] - def check_existing_ingest(self, base_url): + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ Check in sandcrawler-db (postgres) to see if we have already ingested this URL (ingest file result table). @@ -149,14 +151,14 @@ class IngestFileWorker(SandcrawlerWorker): """ if not self.try_existing_ingest: return None - existing = self.pgrest_client.get_ingest_file_result(base_url) + existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) # TODO: filter on more flags? if existing and existing['hit'] == True: return existing else: return None - def find_resource(self, url, best_mimetype=None, force_recrawl=False): + def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: """ Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. @@ -185,7 +187,7 @@ class IngestFileWorker(SandcrawlerWorker): if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': old_failure = True - if self.try_spn2 and (resource == None or (resource.status == 'no-capture') or soft404 or old_failure): + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): via = "spn2" force_simple_get = 0 for domain in self.spn2_simple_get_domains: @@ -195,12 +197,12 @@ class IngestFileWorker(SandcrawlerWorker): resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get) print("[FETCH {:>6}] {} {}".format( via, - resource.status, - resource.terminal_url or url), + (resource and resource.status), + (resource and resource.terminal_url) or url), file=sys.stderr) return resource - def process_existing(self, request, result_row): + def process_existing(self, request: dict, result_row: dict) -> dict: """ If we have an existing ingest file result, do any database fetches or additional processing necessary to return a result. @@ -228,16 +230,22 @@ class IngestFileWorker(SandcrawlerWorker): } return result - def process_hit(self, resource, file_meta): + def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: """ Run all the necessary processing for a new/fresh ingest hit. """ - return { - 'grobid': self.process_grobid(resource, file_meta), - 'pdf_meta': self.process_pdfextract(resource, file_meta), - } + if ingest_type == "pdf": + return { + 'grobid': self.process_grobid(resource, file_meta), + 'pdf_meta': self.process_pdfextract(resource, file_meta), + } + elif ingest_type == "xml": + # TODO + raise NotImplementedError(f"process {ingest_type} hit") + else: + raise NotImplementedError(f"process {ingest_type} hit") - def process_grobid(self, resource, file_meta): + def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: """ Submits to resource body to GROBID for processing. @@ -268,7 +276,7 @@ class IngestFileWorker(SandcrawlerWorker): result.pop('key', None) return result - def process_pdfextract(self, resource, file_meta): + def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: """ Extracts thumbnail and pdf_meta info from PDF. @@ -296,7 +304,7 @@ class IngestFileWorker(SandcrawlerWorker): result.file_meta = None return result.to_pdftext_dict() - def timeout_response(self, task): + def timeout_response(self, task: dict) -> dict: print("[TIMEOUT]", file=sys.stderr) return dict( request=task, @@ -305,22 +313,20 @@ class IngestFileWorker(SandcrawlerWorker): error_message="ingest worker internal timeout", ) - def want(self, request): + def want(self, request: dict) -> bool: if not request.get('ingest_type') in ('file', 'pdf'): return False return True - def process(self, request, key=None): + def process(self, request: dict, key: Any = None) -> dict: - # backwards compatibility - if request.get('ingest_type') in ('file', None): + # old backwards compatibility + if request.get('ingest_type') == 'file': request['ingest_type'] = 'pdf' - # for now, only pdf ingest is implemented - if not 'ingest_type' in request: - request['ingest_type'] = "pdf" - assert request.get('ingest_type') == "pdf" ingest_type = request.get('ingest_type') + if ingest_type not in ("pdf", "xml"): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets @@ -339,17 +345,19 @@ class IngestFileWorker(SandcrawlerWorker): best_mimetype = None if ingest_type == "pdf": best_mimetype = "application/pdf" + elif ingest_type == "xml": + best_mimetype = "text/xml" + elif ingest_type == "html": + best_mimetype = "text/html" - existing = self.check_existing_ingest(base_url) + existing = self.check_existing_ingest(ingest_type, base_url) if existing: return self.process_existing(request, existing) - result = dict(request=request, hit=False) + result: Dict[str, Any] = dict(request=request, hit=False) next_url = base_url hops = [base_url] - self.max_hops = 6 - while len(hops) <= self.max_hops: @@ -402,25 +410,9 @@ class IngestFileWorker(SandcrawlerWorker): result['error_message'] = str(e)[:1600] return result - if not resource.hit: - result['status'] = resource.status - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) - return result + assert resource - if not resource.body: - result['status'] = 'null-body' - return result - file_meta = gen_file_metadata(resource.body) - - if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url): - result['status'] = 'blocked-cookie' + if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, @@ -428,64 +420,61 @@ class IngestFileWorker(SandcrawlerWorker): } if resource.terminal_url not in result['hops']: result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status return result - # crude handling of content-encoding; wayback fetch library usually - # (and should always?) handle this - if file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print("transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) - try: - inner_body = gzip.decompress(resource.body) - except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) - return result - if not inner_body: - result['status'] = 'null-body' - return result - resource = ResourceResult( - body=inner_body, - # copy all other fields - start_url=resource.start_url, - hit=resource.hit, - status=resource.status, - terminal_url=resource.terminal_url, - terminal_dt=resource.terminal_dt, - terminal_status_code=resource.terminal_status_code, - cdx=resource.cdx, - revisit_cdx=resource.revisit_cdx, - ) - file_meta = gen_file_metadata(resource.body) - - if "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] or "application/xml" in file_meta['mimetype'] or "text/xml" in file_meta['mimetype']: - # Got landing page or similar. Some XHTML detected as "application/xml" - if resource.terminal_dt: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) - - result['html'] = fulltext_url - if not fulltext_url: - result['status'] = 'no-pdf-link' - return result - next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') - assert next_url - next_url = clean_url(next_url) - print("[PARSE\t] {}\t{}".format( - fulltext_url.get('technique'), - next_url, - ), - file=sys.stderr) - if next_url in hops: - result['status'] = 'link-loop' - result['error_message'] = "repeated: {}".format(next_url) - return result - hops.append(next_url) - continue - + if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url): + result['status'] = 'blocked-cookie' + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) + return result + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + # here is where we split based on ingest type + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + if ingest_type == "pdf": + if html_ish_resource: + # Got landing page or similar. Some XHTML detected as "application/xml" + fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) + + result['html'] = fulltext_url + if not fulltext_url: + result['status'] = 'no-pdf-link' + return result + next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') + assert next_url + next_url = clean_url(next_url) + print("[PARSE {:>6}] {} {}".format( + ingest_type, + fulltext_url.get('technique'), + next_url, + ), + file=sys.stderr) + if next_url in hops: + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + else: + raise NotImplementedError() + # default is to NOT keep hopping break @@ -493,6 +482,11 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "max-hops-exceeded" return result + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + if resource.terminal_url: result['terminal'] = { "terminal_url": resource.terminal_url, @@ -501,35 +495,37 @@ class IngestFileWorker(SandcrawlerWorker): "terminal_sha1hex": file_meta['sha1hex'], } - # fetch must be a hit if we got this far (though not necessarily an ingest hit!) - assert resource.hit == True - assert resource.terminal_status_code in (200, 226) - result['file_meta'] = file_meta result['cdx'] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) - # other failure cases - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' - return result - - if not (resource.hit and file_meta['mimetype'] == "application/pdf"): - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" - return result + if ingest_type == "pdf": + if not file_meta['mimetype'] == "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + else: + raise NotImplementedError() - info = self.process_hit(resource, file_meta) + info = self.process_hit(ingest_type, resource, file_meta) result.update(info) result['status'] = "success" result['hit'] = True - print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format( - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) + if ingest_type == "pdf": + print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) + else: + print("[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) return result |