diff options
Diffstat (limited to 'python/sandcrawler/ingest_file.py')
-rw-r--r-- | python/sandcrawler/ingest_file.py | 925 |
1 files changed, 925 insertions, 0 deletions
diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py new file mode 100644 index 0000000..03277f8 --- /dev/null +++ b/python/sandcrawler/ingest_file.py @@ -0,0 +1,925 @@ +import json +import sys +import time +import xml.etree.ElementTree +from http.server import BaseHTTPRequestHandler +from typing import Any, Dict, List, Optional + +from selectolax.parser import HTMLParser + +from sandcrawler.db import SandcrawlerPostgrestClient +from sandcrawler.grobid import GrobidClient +from sandcrawler.html import extract_fulltext_url +from sandcrawler.html_metadata import ( + html_extract_biblio, + html_extract_resources, + load_adblock_rules, +) +from sandcrawler.ia import ( + CdxApiError, + NoCaptureError, + PetaboxError, + ResourceResult, + SavePageNowBackoffError, + SavePageNowClient, + SavePageNowError, + WaybackClient, + WaybackContentError, + WaybackError, + cdx_to_dict, + fix_transfer_encoding, +) +from sandcrawler.ingest_html import ( + WebResource, + fetch_html_resources, + html_extract_body_teixml, + html_guess_platform, + html_guess_scope, + quick_fetch_html_resources, +) +from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime +from sandcrawler.pdfextract import PdfExtractResult, process_pdf +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.xml import xml_reserialize + +MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024 + + +class IngestFileWorker(SandcrawlerWorker): + """ + High level flow is to look in history first, then go to live web if + resource not found. Following redirects is treated as "fetching a + resource". Current version fetches a single resource; if it isn't a hit + but is an HTML 200, treats it as a landing page, tries to extract + fulltext link, then fetches that resource. + + process(request, key=None) -> response + Does all the things! + + Check existing processing (short circuit): + + check_existing_ingest(base_url) -> ingest_file_result or none + process_existing(result) -> response + try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_file_hit() + + Fetch resource: + + find_resource(url) -> ResourceResult + + Process resource: + + process_file_hit(ResourceResult) -> response + process_grobid(ResourceResult) + """ + + def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs): + super().__init__() + + self.sink = sink + + if kwargs.get("wayback_client"): + self.wayback_client: WaybackClient = kwargs["wayback_client"] + else: + self.wayback_client = WaybackClient() + + if kwargs.get("spn_client"): + self.spn_client: SavePageNowClient = kwargs["spn_client"] + else: + self.spn_client = SavePageNowClient( + spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0) + ) + + if kwargs.get("grobid_client"): + self.grobid_client: GrobidClient = kwargs["grobid_client"] + else: + self.grobid_client = GrobidClient() + + if kwargs.get("pgrest_client"): + self.pgrest_client: SandcrawlerPostgrestClient = kwargs["pgrest_client"] + else: + self.pgrest_client = SandcrawlerPostgrestClient() + + self.grobid_sink = kwargs.get("grobid_sink") + self.thumbnail_sink = kwargs.get("thumbnail_sink") + self.pdftext_sink = kwargs.get("pdftext_sink") + self.xmldoc_sink = kwargs.get("xmldoc_sink") + self.htmlteixml_sink = kwargs.get("htmlteixml_sink") + self.max_hops = 8 + + self.try_existing_ingest = kwargs.get("try_existing_ingest", False) + self.try_existing_grobid = kwargs.get("try_existing_grobid", True) + self.try_existing_pdfextract = kwargs.get("try_existing_pdfextract", True) + self.try_wayback = kwargs.get("try_wayback", True) + self.try_spn2 = kwargs.get("try_spn2", True) + self.html_quick_mode = kwargs.get("html_quick_mode", False) + self.adblock_rules = load_adblock_rules() + self.max_html_resources = 200 + + self.base_url_blocklist = [ + "://localhost/", + "://127.0.0.1/", + # robot blocking / rate-limited + "://hkvalidate.perfdrive.com/", + "://ieeexplore.ieee.org/", + # temporary, until we implement specific fetch and 'petabox' output + "://archive.org/", + "://www.archive.org/", + "://web.archive.org/web/", + # out of scope + "://openlibrary.org/", + "://www.openlibrary.org/", + "://fatcat.wiki/", + "://scholar.archive.org/", + "://orcid.org/", + # Domain squats + "://bartandjones.com", + "://ijretm.com", + "://ijrcemas.com", + "://jist.net.in", + "://croisements-revue.org", + # all stubs/previews, not full papers + "://page-one.live.cf.public.springer.com", + # large datasets-only (no PDF expected) + "plutof.ut.ee/", + "www.gbif.org/", + "doi.pangaea.de/", + "www.plate-archive.org/", + "://doi.org/10.25642/ipk/gbis/", + "://apex.ipk-gatersleben.de/", + "fao.org/glis/", + # Historical non-paper content: + "dhz.uni-passau.de/", # newspapers + "digital.ucd.ie/", # ireland national historical + # DOI prefixes + "doi.org/10.2307/", # JSTOR; slow and many redirects + "doi.org/10.18730/", # fao.org: database entry + "doi.org/10.15468/", # gbif.org: database entry + "doi.org/10.48550/", # arxiv.org: redundant with direct ingest + # deprecated domain (doesn't redirect correctly) + "://edoc.mpg.de/", + # bogus/spam PDFs + "://isiarticles.com/", + ] + + self.wall_blocklist = [ + # loginwall + "://profile.thieme.de/HTML/sso/ejournals/login.htm", + "://login.bepress.com/", + "?SAMLRequest=", + "://osapublishing.org/captcha/", + "/password-login", + "://gateway.isiknowledge.com/", + "/login?TARGET=", + "jstage.jst.go.jp/sblogin", + "://acw.elsevier.com/SSOCore", + "://acw.sciencedirect.com/SSOCore", + "/login?source=", + ] + + self.cookie_blocklist = [ + "/cookieAbsent", + "cookieSet=1", + "error=cookies_not_supported", + # SPNv2 seems to work (not end up here), but heritrix fails + "://secure.jbs.elsevierhealth.com/", + ] + + self.src_valid_mimetypes = [ + "text/x-tex", + "application/gzip", + "application/x-bzip", + "application/x-bzip2", + "application/zip", + "application/x-tar", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ] + + self.component_valid_mimetypes = [ + "image/jpeg", + "image/tiff", + "image/png", + "image/gif", + "audio/mpeg", + "video/mp4", + "video/mpeg", + "text/plain", + "text/csv", + "text/x-r-source", # dataverse + "text/tab-separated-values", # dataverse + "text/x-rst", # dataverse + "application/x-rlang-transport", # dataverse + "application/json", + "application/xml", + "application/pdf", + "application/gzip", + "application/x-bzip", + "application/x-bzip2", + "application/zip ", + "application/x-rar ", + "application/x-7z-compressed", + "application/x-tar", + "application/vnd.ms-powerpoint", + "application/vnd.ms-excel", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ] + + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: + """ + Check in sandcrawler-db (postgres) to see if we have already ingested + this URL (ingest file result table). + + Returns existing row *if* found *and* we should use it, otherwise None. + + Looks at existing ingest results and makes a decision based on, eg, + status and timestamp. + """ + if not self.try_existing_ingest: + return None + existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) + # TODO: filter on more flags? + if existing and existing["hit"] is True: + return existing + else: + return None + + def find_resource( + self, url: str, best_mimetype: Optional[str] = None, force_recrawl: bool = False + ) -> Optional[ResourceResult]: + """ + Looks in wayback for a resource starting at the URL, following any + redirects. If a hit isn't found, try crawling with SPN. + """ + via = "none" + resource = None + + if url.startswith("http://web.archive.org/web/") or url.startswith( + "https://web.archive.org/web/" + ): + raise NotImplementedError("handling direct wayback links not supported yet") + + if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): + raise NotImplementedError("fetching from archive.org not implemented yet") + + if self.try_wayback and not force_recrawl: + via = "wayback" + resource = self.wayback_client.lookup_resource(url, best_mimetype) + + # check for "soft 404" conditions, where we should retry with live SPNv2 + soft404 = False + # NOTE: these are often not working with SPNv2 either, so disabling. If + # we really want to try again, should do force-recrawl + # if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): + # soft404 = True + + old_failure = False + if ( + resource + and not resource.hit + and resource.terminal_dt + and resource.terminal_dt < "20190000000000" + ): + old_failure = True + + if self.try_spn2 and ( + resource is None + or (resource and resource.status == "no-capture") + or soft404 + or old_failure + ): + via = "spn2" + resource = self.spn_client.crawl_resource(url, self.wayback_client) + print( + "[FETCH {:>6}] {} {}".format( + via, (resource and resource.status), (resource and resource.terminal_url) or url + ), + file=sys.stderr, + ) + return resource + + def process_existing(self, request: dict, result_row: dict) -> dict: + """ + If we have an existing ingest file result, do any database fetches or + additional processing necessary to return a result. + """ + raise NotImplementedError("process_existing() not tested or safe yet") + assert result_row["hit"] + existing_file_meta = self.pgrest_client.get_file_meta(result_row["terminal_sha1hex"]) + existing_grobid = self.pgrest_client.get_grobid(result_row["terminal_sha1hex"]) + existing_cdx = self.pgrest_client.get_cdx( + result_row["terminal_url"], result_row["terminal_dt"] + ) + if not (existing_file_meta and existing_grobid and existing_cdx): + raise NotImplementedError("partially-exsiting records not implemented yet") + result = { + "hit": result_row["hit"], + "status": "existing", + "request": request, + "grobid": existing_grobid, + "file_meta": existing_file_meta, + "cdx": existing_cdx, + "terminal": { + "terminal_url": result_row["terminal_url"], + "terminal_dt": result_row["terminal_dt"], + "terminal_status_code": result_row["terminal_status_code"], + "terminal_sha1hex": result_row["terminal_sha1hex"], + }, + } + return result + + def process_file_hit( + self, ingest_type: str, resource: ResourceResult, file_meta: dict + ) -> dict: + """ + Run all the necessary processing for a new/fresh ingest hit. + """ + if ( + ingest_type in ["dataset-file", "component"] + and file_meta["mimetype"] == "application/pdf" + ): + ingest_type = "pdf" + if ingest_type == "pdf": + return { + "grobid": self.process_grobid(resource, file_meta), + "pdf_meta": self.process_pdfextract(resource, file_meta), + } + elif ingest_type == "xml": + return { + "xml_meta": self.process_xml(resource, file_meta), + } + elif ingest_type == "html": + html_info = self.process_html(resource, file_meta) + # if there is no html_biblio, don't clobber anything possibly extracted earlier + if "html_biblio" in html_info and not html_info["html_biblio"]: + html_info.pop("html_biblio") + return html_info + elif ingest_type == "src": + return {} + elif ingest_type == "component": + return {} + elif ingest_type == "dataset-file": + return {} + else: + raise NotImplementedError(f"process {ingest_type} hit") + + def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Submits to resource body to GROBID for processing. + + TODO: By default checks sandcrawler-db for an existing row first, then + decide if we should re-process + """ + if self.try_existing_grobid: + existing = self.pgrest_client.get_grobid(file_meta["sha1hex"]) + if existing: + # grobid_timestamp = existing.get("grobid_timestamp") or None + # status + grobid_version = existing.get("grobid_version") or None + if grobid_version and grobid_version.startswith("0.7"): + print("found existing GROBID result", file=sys.stderr) + return existing + + # Need to actually processes + result = self.grobid_client.process_fulltext(resource.body) + if self.grobid_sink: + # extra fields for GROBID kafka messages + result["file_meta"] = file_meta + result["key"] = result["file_meta"]["sha1hex"] + self.grobid_sink.push_record(result.copy()) + if result["status"] == "success": + metadata = self.grobid_client.metadata(result) + if metadata: + result["metadata"] = metadata + result["fatcat_release"] = metadata.pop("fatcat_release", None) + result["grobid_version"] = metadata.pop("grobid_version", None) + result.pop("tei_xml", None) + result.pop("file_meta", None) + result.pop("key", None) + return result + + def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Extracts thumbnail and pdf_meta info from PDF. + + By default checks sandcrawler-db for an existing row first, then decide + if we should re-process. + + TODO: difference between Kafka schema and SQL/postgrest schema + """ + if self.try_existing_pdfextract: + existing = self.pgrest_client.get_pdf_meta(file_meta["sha1hex"]) + if existing: + print("found existing pdf_meta result", file=sys.stderr) + result = PdfExtractResult.from_pdf_meta_dict(existing) + return result.to_pdftext_dict() + + # Need to actually processes + result = process_pdf(resource.body) + assert result.sha1hex == file_meta["sha1hex"] + assert result.file_meta is not None + assert result.file_meta["sha1hex"] == file_meta["sha1hex"] + if self.thumbnail_sink and result.page0_thumbnail is not None: + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) + if self.pdftext_sink: + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) + result.page0_thumbnail = None + result.text = None + result.file_meta = None + return result.to_pdftext_dict() + + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta["mimetype"] == "application/jats+xml": + try: + jats_xml = xml_reserialize(resource.body) + except xml.etree.ElementTree.ParseError: + return dict(status="xml-parse-error") + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=jats_xml, + ) + self.xmldoc_sink.push_record(msg, key=file_meta["sha1hex"]) + return dict(status="success") + + def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: + + assert resource.body + try: + html_doc = HTMLParser(resource.body) + except ValueError: + return dict(status="html-selectolax-error") + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + assert html_biblio + try: + html_body = html_extract_body_teixml(resource.body) + except xml.etree.ElementTree.ParseError: + return dict(status="html-teixml-error") + html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) + html_scope = html_guess_scope( + resource.terminal_url, html_doc, html_biblio, html_body.get("word_count") + ) + html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) + + if html_scope in ("blocked-captcha", "blocked-cookie", "blocked-forbidden"): + return dict( + status=html_scope, + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + ) + elif html_scope not in ( + "article-fulltext", + "unknown", + ): + html_body.pop("tei_xml", None) + return dict( + status="wrong-scope", + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + raw_resources = html_extract_resources( + resource.terminal_url, html_doc, self.adblock_rules + ) + if len(raw_resources) > self.max_html_resources: + html_body.pop("tei_xml", None) + return dict( + status="too-many-resources", + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + if self.htmlteixml_sink and html_body["status"] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta["sha1hex"]) + + html_body.pop("tei_xml", None) + + partial_result = dict( + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + when = parse_cdx_datetime(resource.cdx.datetime) + full_resources: List[WebResource] = [] + + try: + if self.html_quick_mode: + print(" WARN: running quick CDX-only fetches", file=sys.stderr) + full_resources = quick_fetch_html_resources( + raw_resources, self.wayback_client.cdx_client, when + ) + else: + full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) + except PetaboxError as e: + partial_result["status"] = "petabox-error" + partial_result["error_message"] = str(e)[:1600] + return partial_result + except CdxApiError as e: + partial_result["status"] = "cdx-error" + partial_result["error_message"] = str(e)[:1600] + return partial_result + except WaybackError as e: + partial_result["status"] = "wayback-error" + partial_result["error_message"] = str(e)[:1600] + return partial_result + except WaybackContentError as e: + partial_result["status"] = "wayback-content-error" + partial_result["error_message"] = str(e)[:1600] + return partial_result + except NoCaptureError as e: + partial_result["status"] = "html-resource-no-capture" + partial_result["error_message"] = str(e)[:1600] + return partial_result + + info = dict( + html_body=html_body, + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], + ) + if html_scope == "unknown": + info["status"] = "unknown-scope" + return info + + def timeout_response(self, task: dict) -> dict: + print("[TIMEOUT]", file=sys.stderr) + return dict( + request=task, + hit=False, + status="timeout", + error_message="ingest worker internal timeout", + ) + + def want(self, request: dict) -> bool: + if not request.get("ingest_type") in ("file", "pdf", "xml", "html", "src", "component"): + return False + return True + + def process(self, request: dict, key: Any = None) -> dict: + return self.process_file(request, key=key) + + def process_file(self, request: dict, key: Any = None) -> dict: + + # old backwards compatibility + if request.get("ingest_type") == "file": + request["ingest_type"] = "pdf" + + ingest_type = request.get("ingest_type") + if ingest_type not in ("pdf", "xml", "html", "src", "component"): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + + # parse/clean URL + # note that we pass through the original/raw URL, and that is what gets + # persisted in database table + base_url = clean_url(request["base_url"]) + + force_recrawl = bool(request.get("force_recrawl", False)) + + for block in self.base_url_blocklist: + if block in base_url: + print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + return dict(request=request, hit=False, status="skip-url-blocklist") + + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + + best_mimetype = None + if ingest_type == "pdf": + best_mimetype = "application/pdf" + elif ingest_type == "xml": + best_mimetype = "text/xml" + elif ingest_type == "html": + best_mimetype = "text/html" + elif ingest_type == "src": + best_mimetype = "application/gzip" + + existing = self.check_existing_ingest(ingest_type, base_url) + if existing: + return self.process_existing(request, existing) + + result: Dict[str, Any] = dict(request=request, hit=False) + + next_url = base_url + hops = [base_url] + + while len(hops) <= self.max_hops: + + result["hops"] = hops + + # check against blocklist again on each hop + for block in self.base_url_blocklist: + if block in next_url: + result["status"] = "skip-url-blocklist" + return result + + # also check against known loginwall patterns + for block in self.wall_blocklist: + if block in next_url: + # TODO: blocked-wall instead of skip-wall + result["status"] = "skip-wall" + return result + + # check for popular cookie blocking URL patterns. On successful SPN + # crawls, shouldn't see these redirect URLs + for pattern in self.cookie_blocklist: + if pattern in next_url: + result["status"] = "blocked-cookie" + return result + + try: + resource = self.find_resource( + next_url, best_mimetype, force_recrawl=force_recrawl + ) + except SavePageNowError as e: + result["status"] = "spn2-error" + result["error_message"] = str(e)[:1600] + return result + except SavePageNowBackoffError as e: + result["status"] = "spn2-backoff" + result["error_message"] = str(e)[:1600] + # small sleep as a slow-down + time.sleep(2.0) + return result + except PetaboxError as e: + result["status"] = "petabox-error" + result["error_message"] = str(e)[:1600] + return result + except CdxApiError as e: + result["status"] = "cdx-error" + result["error_message"] = str(e)[:1600] + # add a sleep in cdx-error path as a slow-down + time.sleep(2.0) + return result + except WaybackError as e: + result["status"] = "wayback-error" + result["error_message"] = str(e)[:1600] + return result + except WaybackContentError as e: + result["status"] = "wayback-content-error" + result["error_message"] = str(e)[:1600] + return result + except NotImplementedError as e: + result["status"] = "not-implemented" + result["error_message"] = str(e)[:1600] + return result + + assert resource + + if resource.terminal_url: + result["terminal"] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + if resource.terminal_url not in result["hops"]: + result["hops"].append(resource.terminal_url) + + if not resource.hit: + result["status"] = resource.status + return result + + if resource.terminal_url: + for pattern in self.base_url_blocklist: + if pattern in resource.terminal_url: + result["status"] = "skip-url-blocklist" + return result + + if resource.terminal_url: + for pattern in self.cookie_blocklist: + if pattern in resource.terminal_url: + result["status"] = "blocked-cookie" + return result + + if not resource.body: + result["status"] = "empty-blob" + return result + + if len(resource.body) > MAX_BODY_SIZE_BYTES: + result["status"] = "body-too-large" + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result["status"] = "bad-gzip-encoding" + result["error_message"] = str(e) + return result + + if not resource.body or file_meta["size_bytes"] == 0: + result["status"] = "empty-blob" + return result + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta["mimetype"] + or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml" + or "application/xml" in file_meta["mimetype"] + or "text/xml" in file_meta["mimetype"] + ) + html_biblio = None + html_doc = None + if html_ish_resource and resource.body: + try: + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + if html_biblio: + if "html_biblio" not in result and html_biblio.title: + result["html_biblio"] = json.loads( + html_biblio.json(exclude_none=True) + ) + # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + except ValueError: + pass + + if ingest_type == "pdf" and html_ish_resource: + + # the new style of URL extraction (already computed) + if html_biblio and html_biblio.pdf_fulltext_url: + fulltext_url = dict( + pdf_url=html_biblio.pdf_fulltext_url, + technique="html_biblio", + ) + else: + fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) + + result["extract_next_hop"] = fulltext_url + if not fulltext_url: + # check if we hit a paywall/loginwall + for block in self.wall_blocklist: + if block in resource.terminal_url: + result["status"] = "blocked-wall" + return result + # else, just failed to find link + result["status"] = "no-pdf-link" + return result + next_url = fulltext_url.get("pdf_url") or fulltext_url.get("next_url") or "" + assert next_url + next_url = clean_url(next_url) + print( + "[PARSE {:>6}] {} {}".format( + ingest_type, + fulltext_url.get("technique"), + next_url, + ), + file=sys.stderr, + ) + if next_url in hops: + result["status"] = "link-loop" + result["error_message"] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + elif ( + ingest_type in ("xml", "html", "component") + and html_ish_resource + and html_biblio + ): + # NOTE: src_fulltext_url is not a thing + next_url_found = None + if ingest_type == "xml" and html_biblio.xml_fulltext_url: + next_url_found = html_biblio.xml_fulltext_url + elif ingest_type == "html" and html_biblio.html_fulltext_url: + next_url_found = html_biblio.html_fulltext_url + elif ingest_type == "component" and html_biblio.component_url: + next_url_found = html_biblio.component_url + + if next_url_found: + next_url = next_url_found + technique = "html_biblio" + print( + "[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr, + ) + if next_url in hops: + if ingest_type == "html": + # for HTML ingest, we don't count this as a link-loop + break + result["status"] = "link-loop" + result["error_message"] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + + # default is to NOT keep hopping + break + + if len(hops) >= self.max_hops: + result["status"] = "max-hops-exceeded" + return result + + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit is True + assert resource.terminal_status_code in (200, 226) + + if resource.terminal_url: + result["terminal"] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + "terminal_sha1hex": file_meta["sha1hex"], + } + + result["file_meta"] = file_meta + result["cdx"] = cdx_to_dict(resource.cdx) + if resource.revisit_cdx: + result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx) + + # check if we hit a paywall/loginwall before trying mimetype + for block in self.wall_blocklist: + if block in resource.terminal_url: + result["status"] = "blocked-wall" + return result + + if ingest_type == "pdf": + if file_meta["mimetype"] != "application/pdf": + result["status"] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta["mimetype"] not in ( + "application/xml", + "text/xml", + "application/jats+xml", + ): + result["status"] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"): + result["status"] = "wrong-mimetype" + return result + elif ingest_type == "src": + if file_meta["mimetype"] not in self.src_valid_mimetypes: + result["status"] = "wrong-mimetype" + return result + elif ingest_type == "component": + if file_meta["mimetype"] not in self.component_valid_mimetypes: + result["status"] = "wrong-mimetype" + return result + else: + raise NotImplementedError() + + info = self.process_file_hit(ingest_type, resource, file_meta) + result.update(info) + + # check if processing turned up an error + if info.get("status") not in ("success", None): + result["status"] = info["status"] + return result + + result["status"] = "success" + result["hit"] = True + if ingest_type == "pdf": + print( + "[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get("file_meta", {}).get("sha1hex"), + result.get("grobid", {}).get("status_code"), + result.get("pdf_meta", {}).get("status"), + ), + file=sys.stderr, + ) + else: + print( + "[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get("file_meta", {}).get("sha1hex"), + ), + file=sys.stderr, + ) + return result + + +class IngestFileRequestHandler(BaseHTTPRequestHandler): + def do_POST(self) -> None: + if self.path != "/ingest": + self.send_response(404) + self.end_headers() + self.wfile.write(b"404: Not Found") + return + length = int(self.headers.get("content-length")) + request = json.loads(self.rfile.read(length).decode("utf-8")) + print("Got request: {}".format(request)) + ingester = IngestFileWorker() + result = ingester.process(request) + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(result).encode("utf8")) |