import sys import unicodedata from typing import Any, Dict, List, Optional import requests from grobid_tei_xml import GrobidBiblio, parse_citation_list_xml, parse_document_xml from .ia import WaybackClient from .misc import gen_file_metadata from .workers import SandcrawlerFetchWorker, SandcrawlerWorker def clean_ref_str(raw: str) -> str: """ When comparing raw unstructured strings (from upstream sources) to GROBID-returned citations, we sometimes want to do exact comparisons to match up records (eg, from crossref). GROBID does some (totally reasonable) arbitrary normalizations of strings, like simplifying whitespace. This routine is to make comparisons against when GROBID returned and original strings easier. """ # TODO: dead test code # raw = unicodedata.normalize('NFKC', raw) # raw = raw.replace('\u00a0', ' ').replace('\u2013', '-').replace('\u2014', '-').strip() # raw = ' '.join(raw.split()) raw = raw.replace(" ", " ") return raw def test_clean_ref_str() -> None: raw_with_nbsp = """Qingyao Ai Keping Bi Cheng Luo Jiafeng Guo and W. Bruce Croft. 2018. Unbiased Learning to Rank with Unbiased Propensity Estimation. (2018) 385–394. Qingyao Ai Keping Bi Cheng Luo Jiafeng Guo and W. Bruce Croft. 2018. Unbiased Learning to Rank with Unbiased Propensity Estimation. (2018) 385–394.""" raw_without_nbsp = """Qingyao Ai Keping Bi Cheng Luo Jiafeng Guo and W. Bruce Croft. 2018. Unbiased Learning to Rank with Unbiased Propensity Estimation. (2018) 385-394. Qingyao Ai Keping Bi Cheng Luo Jiafeng Guo and W. Bruce Croft. 2018. Unbiased Learning to Rank with Unbiased Propensity Estimation. (2018) 385-394.""" assert clean_ref_str(raw_with_nbsp) == raw_without_nbsp class GrobidClient(object): def __init__(self, host_url: str = "https://grobid.qa.fatcat.wiki", **kwargs): self.host_url = host_url self.consolidate_mode = int(kwargs.get("consolidate_mode", 0)) def process_fulltext( self, blob: bytes, consolidate_mode: Optional[int] = None ) -> Dict[str, Any]: """ Returns dict with keys: - status_code - status (slug) - error_msg (if status == 'error') - tei_xml (if status is 200) TODO: persist connection for performance? """ assert blob if consolidate_mode is None: consolidate_mode = self.consolidate_mode assert consolidate_mode is not None try: grobid_response = requests.post( self.host_url + "/api/processFulltextDocument", files={ "input": blob, "consolidateHeader": consolidate_mode, "consolidateCitations": 0, # too expensive for now "includeRawCitations": 1, }, timeout=180.0, ) except requests.Timeout: return { "status": "error-timeout", "status_code": -4, # heritrix3 "HTTP timeout" code "error_msg": "GROBID request (HTTP POST) timeout", } info: Dict[str, Any] = dict(status_code=grobid_response.status_code) if grobid_response.status_code == 200: info["status"] = "success" info["tei_xml"] = grobid_response.text if len(info["tei_xml"]) > 12000000: # XML is larger than Kafka message size, and much larger than # an article in general; bail out info["status"] = "error" info["error_msg"] = "response XML too large: {} bytes".format( len(info["tei_xml"]) ) info.pop("tei_xml") else: # response.text is .content decoded as utf-8 info["status"] = "error" info["error_msg"] = grobid_response.text[:10000] return info def process_citation_list(self, unstructured_list: List[str]) -> List[GrobidBiblio]: if not unstructured_list: return [] if len(unstructured_list) > 5000: raise ValueError("more than 5,000 references in a batch is just too much") try: grobid_response = requests.post( self.host_url + "/api/processCitationList", data={ "citations": unstructured_list, "consolidateCitations": 0, "includeRawCitations": 1, }, timeout=30.0, ) except requests.Timeout as te: # TODO: handle somehow? raise te grobid_response.raise_for_status() return parse_citation_list_xml(grobid_response.text) def metadata(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]: if result["status"] != "success": return None tei_doc = parse_document_xml(result["tei_xml"]) tei_doc.remove_encumbered() tei_json = tei_doc.to_legacy_dict() meta = dict() biblio = dict() for k in ( "title", "authors", "journal", "date", "doi", ): if tei_json.get(k): biblio[k] = tei_json[k] meta["biblio"] = biblio for k in ("grobid_version", "grobid_timestamp", "fatcat_release", "language_code"): if tei_json.get(k): meta[k] = tei_json[k] return meta def should_parse_crossref_ref(self, ref: Dict[str, Any]) -> bool: """ Helper function to decide whether to run GROBID parsing on an crossref reference. For example, if there is already a DOI in the ref metadata, could skip. Or, if there is sufficient structured metadata, or only depending on the source of the DOI linkage. """ if ref.get("DOI"): return False if len(ref.get("unstructured", "")) <= 6: return False # TODO: what other combinations are enough to skip parsing? if ( ref.get("year") and ref.get("author") and (ref.get("article-title") or ref.get("series-title") or ref.get("volume-title")) ): return False elif ref.get("year") and ref.get("author") and ref.get("journal-title"): return False elif ref.get("journal-title") and ref.get("volume") and ref.get("first-page"): return False return True def crossref_refs(self, record: Dict[str, Any]) -> Dict[str, Any]: """ Given a complete Crossref metadata record, inspects the The returned dict is in the schema of the `grobid_refs` database table, in dict form: source: 'crossref' source_id: doi, as lower-case string source_ts: Crossref indexed timestamp, if available ('updated' is not set) refs_json: list of dicts """ # remove API wrapper around record, if necessary if "message" in record and "DOI" not in record: record = record["message"] ret = dict( source="crossref", source_id=record["DOI"].lower(), source_ts=record["indexed"]["date-time"], refs_json=[], ) all_refs = record.get("reference", []) unstructured_refs = [] for r in all_refs: if not r.get("unstructured"): continue if not self.should_parse_crossref_ref(r): continue unstructured_refs.append(r) if not unstructured_refs: return ret # some reasonable cap on length of refs per work if len(unstructured_refs) > 2000: print( f"truncatin very large reference list for doi:{record['DOI']} len:{len(unstructured_refs)}", file=sys.stderr, ) unstructured_refs = unstructured_refs[:2000] refs = self.process_citation_list([r["unstructured"] for r in unstructured_refs]) assert len(refs) == len(unstructured_refs) refs_json = [] for i in range(len(refs)): refs[i].id = unstructured_refs[i].get("key") original = unstructured_refs[i]["unstructured"] original_clean = clean_ref_str(unstructured_refs[i]["unstructured"]) assert ( refs[i].unstructured == original or refs[i].unstructured == original_clean ), f'raw citation mismatch (GROBID then original cleaned): \n{refs[i].unstructured.encode("utf-8")}\n{original_clean.encode("utf-8")}' # intentionally put "unclean" original string in, to allow later # exact byte-accurate comparisons refs[i].unstructured = original refs_json.append(refs[i].to_dict()) ret["refs_json"] = refs_json return ret class GrobidWorker(SandcrawlerFetchWorker): def __init__( self, grobid_client: GrobidClient, wayback_client: Optional[WaybackClient] = None, sink: Optional[SandcrawlerWorker] = None, **kwargs, ): super().__init__(wayback_client=wayback_client) self.grobid_client = grobid_client self.sink = sink self.consolidate_mode = 0 def timeout_response(self, task: Any) -> Any: default_key = task["sha1hex"] return dict( status="error-timeout", error_msg="internal GROBID worker timeout", source=task, key=default_key, ) def process(self, record: Any, key: Optional[str] = None) -> Any: fetch_result = self.fetch_blob(record) if fetch_result["status"] != "success": return fetch_result blob: bytes = fetch_result["blob"] assert blob and isinstance(blob, bytes) result = self.grobid_client.process_fulltext( blob, consolidate_mode=self.consolidate_mode ) result["file_meta"] = gen_file_metadata(blob) result["source"] = record result["key"] = result["file_meta"]["sha1hex"] return result class CrossrefRefsWorker(SandcrawlerWorker): def __init__( self, grobid_client: GrobidClient, sink: Optional[SandcrawlerWorker] = None, **kwargs ): super().__init__(**kwargs) self.grobid_client = grobid_client self.sink = sink def process(self, record: Any, key: Optional[str] = None) -> Any: return self.grobid_client.crossref_refs(record) class GrobidBlobWorker(SandcrawlerWorker): """ This is sort of like GrobidWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ def __init__( self, grobid_client: GrobidClient, sink: Optional[SandcrawlerWorker] = None, **kwargs ): super().__init__() self.grobid_client = grobid_client self.sink = sink self.consolidate_mode = 0 def process(self, blob: Any, key: Optional[str] = None) -> Any: if not blob: return None result = self.grobid_client.process_fulltext( blob, consolidate_mode=self.consolidate_mode ) result["file_meta"] = gen_file_metadata(blob) result["key"] = result["file_meta"]["sha1hex"] return result