import datetime import json from typing import Any, Dict, List, Optional, Tuple import psycopg2 import psycopg2.extras import requests class SandcrawlerPostgrestClient: def __init__(self, api_url: str = "", **kwargs): self.api_url = api_url def get_cdx(self, url: str) -> Optional[dict]: resp = requests.get(self.api_url + "/cdx", params=dict(url="eq." + url)) resp.raise_for_status() return resp.json() or None def get_grobid(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_pdftrio(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_pdf_meta(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_html_meta(self, sha1hex: str) -> Optional[dict]: resp = requests.get( self.api_url + "/html_meta", params=dict(sha1hex=f"eq.{sha1hex}"), ) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_file_meta(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]: resp = requests.get( self.api_url + "/ingest_file_result", params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), ) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]: resp = requests.get( self.api_url + "/ingest_fileset_platform", params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), ) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_crossref(self, doi: str) -> Optional[dict]: resp = requests.get(self.api_url + "/crossref", params=dict(doi=f"eq.{doi}")) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_crossref_with_refs(self, doi: str) -> Optional[dict]: resp = requests.get(self.api_url + "/crossref_with_refs", params=dict(doi=f"eq.{doi}")) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None def get_grobid_refs(self, source: str, source_id: str) -> Optional[dict]: resp = requests.get( self.api_url + "/grobid_refs", params=dict(source=f"eq.{source}", source_id=f"eq.{source_id}"), ) resp.raise_for_status() resp_json = resp.json() if resp_json: return resp_json[0] else: return None class SandcrawlerPostgresClient: def __init__(self, db_url: str, **kwargs): self.conn = psycopg2.connect(db_url) def cursor(self) -> psycopg2.extensions.cursor: return self.conn.cursor() def commit(self) -> None: self.conn.commit() def _inserts_and_updates(self, resp: List[Tuple], on_conflict: str) -> Tuple[int, int]: resp_codes = [int(r[0]) for r in resp] inserts = len([r for r in resp_codes if r == 0]) if on_conflict == "update": updates = len([r for r in resp_codes if r != 0]) else: updates = 0 return (inserts, updates) def insert_cdx( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) VALUES %s ON CONFLICT ON CONSTRAINT cdx_pkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" batch = [d for d in batch if d.get("warc_path")] if not batch: return (0, 0) rows = [ ( d["url"], d["datetime"], d["sha1hex"], d["mimetype"], d["warc_path"], int(d["warc_csize"]), int(d["warc_offset"]), ) for d in batch ] # filter out duplicate rows by key (url, datetime) row_dict = dict() for b in rows: row_dict[(b[0], b[1])] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_file_meta( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) VALUES %s ON CONFLICT (sha1hex) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET sha256hex=EXCLUDED.sha256hex, md5hex=EXCLUDED.md5hex, size_bytes=EXCLUDED.size_bytes, mimetype=EXCLUDED.mimetype """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" rows = [ (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"]) for d in batch ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[b[0]] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_grobid( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) VALUES %s ON CONFLICT (sha1hex) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET grobid_version=EXCLUDED.grobid_version, status_code=EXCLUDED.status_code, status=EXCLUDED.status, fatcat_release=EXCLUDED.fatcat_release, updated=EXCLUDED.updated, metadata=EXCLUDED.metadata """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" for r in batch: if r.get("metadata"): # sometimes these are only in metadata; shouldn't pass through # though (to save database space) dupe_fields = ("fatcat_release", "grobid_version") for k in dupe_fields: if k not in r: r[k] = r["metadata"].get(k) r["metadata"].pop(k, None) r["metadata"] = json.dumps(r["metadata"], sort_keys=True) now = rows = [ ( d["key"], d.get("grobid_version") or None, d["status_code"], d["status"], d.get("fatcat_release") or None, d.get("updated") or now, d.get("metadata") or None, ) for d in batch ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[b[0]] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_pdf_meta( self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ sql = """ INSERT INTO pdf_meta (sha1hex, updated, status, has_page0_thumbnail, page_count, word_count, page0_height, page0_width, permanent_id, pdf_created, pdf_version, metadata) VALUES %s ON CONFLICT (sha1hex) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET updated=EXCLUDED.updated, status=EXCLUDED.status, has_page0_thumbnail=EXCLUDED.has_page0_thumbnail, page_count=EXCLUDED.page_count, word_count=EXCLUDED.word_count, page0_height=EXCLUDED.page0_height, page0_width=EXCLUDED.page0_width, permanent_id=EXCLUDED.permanent_id, pdf_created=EXCLUDED.pdf_created, pdf_version=EXCLUDED.pdf_version, metadata=EXCLUDED.metadata """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[b[0]] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_html_meta( self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ sql = """ INSERT INTO html_meta (sha1hex, updated, status, scope, has_teixml, has_thumbnail, word_count, biblio, resources) VALUES %s ON CONFLICT (sha1hex) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET updated=EXCLUDED.updated, status=EXCLUDED.status, scope=EXCLUDED.scope, has_teixml=EXCLUDED.has_teixml, has_thumbnail=EXCLUDED.has_thumbnail, word_count=EXCLUDED.word_count, biblio=EXCLUDED.biblio, resources=EXCLUDED.resources """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[b[0]] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_pdftrio( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO pdftrio (sha1hex, updated, status_code, status, pdftrio_version, models_date, ensemble_score, bert_score, linear_score, image_score) VALUES %s ON CONFLICT (sha1hex) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET updated=EXCLUDED.updated, status_code=EXCLUDED.status_code, status=EXCLUDED.status, pdftrio_version=EXCLUDED.pdftrio_version, models_date=EXCLUDED.models_date, ensemble_score=EXCLUDED.ensemble_score, bert_score=EXCLUDED.bert_score, linear_score=EXCLUDED.linear_score, image_score=EXCLUDED.image_score """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" now = rows = [ ( d["key"], d.get("updated") or now, d["status_code"], d["status"], d.get("versions", {}).get("pdftrio_version") or None, d.get("versions", {}).get("models_date") or None, d.get("ensemble_score"), d.get("bert_score"), d.get("linear_score"), d.get("image_score"), ) for d in batch ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[b[0]] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_ingest_request( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) VALUES %s ON CONFLICT ON CONSTRAINT ingest_request_pkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" for r in batch: # in case these fields were already packed into 'request' extra = r.get("request", {}) for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"): if r.get(k): extra[k] = r[k] if extra: r["extra"] = json.dumps(extra, sort_keys=True) rows = [ ( d["link_source"], d["link_source_id"], d["ingest_type"], d["base_url"], d.get("ingest_request_source"), d.get("release_stage") or None, d.get("extra") or None, ) for d in batch ] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) row_dict = dict() for b in rows: row_dict[(b[0], b[1], b[2], b[3])] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_ingest_file_result( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) VALUES %s ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET updated=now(), hit=EXCLUDED.hit, status=EXCLUDED.status, terminal_url=EXCLUDED.terminal_url, terminal_dt=EXCLUDED.terminal_dt, terminal_status_code=EXCLUDED.terminal_status_code, terminal_sha1hex=EXCLUDED.terminal_sha1hex """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" rows = [ ( d["ingest_type"], d["base_url"], bool(d["hit"]), d["status"], d.get("terminal_url"), d.get("terminal_dt"), d.get("terminal_status_code"), d.get("terminal_sha1hex"), ) for d in batch ] # filter out duplicate rows by key (ingest_type, base_url) row_dict = dict() for b in rows: row_dict[(b[0], b[1])] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_ingest_fileset_platform( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "nothing", ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) VALUES %s ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET updated=now(), hit=EXCLUDED.hit, status=EXCLUDED.status, platform_name=EXCLUDED.platform_name, platform_domain=EXCLUDED.platform_domain, platform_id=EXCLUDED.platform_id, ingest_strategy=EXCLUDED.ingest_strategy, total_size=EXCLUDED.total_size, file_count=EXCLUDED.file_count, archiveorg_item_name=EXCLUDED.archiveorg_item_name, archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path, web_bundle_url=EXCLUDED.web_bundle_url, web_bundle_dt=EXCLUDED.web_bundle_dt, manifest=EXCLUDED.manifest """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" rows = [ ( d["ingest_type"], d["base_url"], bool(d["hit"]), d["status"], d.get("platform_name"), d.get("platform_domain"), d.get("platform_id"), d.get("ingest_strategy"), d.get("total_size"), d.get("file_count"), d.get("archiveorg_item_name"), d.get("archiveorg_item_bundle_path"), d.get("web_bundle_url"), d.get("web_bundle_dt"), d.get("manifest"), ) for d in batch ] # filter out duplicate rows by key (ingest_type, base_url) row_dict = dict() for b in rows: row_dict[(b[0], b[1])] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_crossref( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "update", ) -> Tuple[int, int]: sql = """ INSERT INTO crossref (doi, indexed, record) VALUES %s ON CONFLICT (doi) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET indexed=EXCLUDED.indexed, record=EXCLUDED.record """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" rows = [ ( d["doi"], d.get("indexed") or None, json.dumps(d["record"], sort_keys=True), ) for d in batch ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[b[0]] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) def insert_grobid_refs( self, cur: psycopg2.extensions.cursor, batch: List[Dict[str, Any]], on_conflict: str = "update", ) -> Tuple[int, int]: sql = """ INSERT INTO grobid_refs (source, source_id, source_ts, updated, refs_json) VALUES %s ON CONFLICT (source, source_id) DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" elif on_conflict.lower() == "update": sql += """ UPDATE SET source_ts=EXCLUDED.source_ts, updated=EXCLUDED.updated, refs_json=EXCLUDED.refs_json """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" now = rows = [ ( d["source"], d["source_id"], d.get("source_ts") or None, d.get("updated") or now, json.dumps(d["refs_json"], sort_keys=True), ) for d in batch ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: row_dict[(b[0], b[1])] = b rows = list(row_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict)