diff options
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r-- | python/sandcrawler/db.py | 650 |
1 files changed, 650 insertions, 0 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py new file mode 100644 index 0000000..f9018ec --- /dev/null +++ b/python/sandcrawler/db.py @@ -0,0 +1,650 @@ +import datetime +import json +from typing import Any, Dict, List, Optional, Tuple + +import psycopg2 +import psycopg2.extras + +from .misc import requests_retry_session + + +class SandcrawlerPostgrestClient: + def __init__(self, api_url: str = "http://wbgrp-svc506.us.archive.org:3030", **kwargs): + self.api_url = api_url + self.http_session = requests_retry_session() + + def get_cdx(self, url: str) -> Optional[dict]: + resp = self.http_session.get(self.api_url + "/cdx", params=dict(url="eq." + url)) + resp.raise_for_status() + return resp.json() or None + + def get_grobid(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1) + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_pdftrio(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1) + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_pdf_meta(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1) + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_html_meta(self, sha1hex: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/html_meta", + params=dict(sha1hex=f"eq.{sha1hex}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_file_meta(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1) + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/ingest_file_result", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/ingest_fileset_platform", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_crossref(self, doi: str) -> Optional[dict]: + resp = self.http_session.get(self.api_url + "/crossref", params=dict(doi=f"eq.{doi}")) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_crossref_with_refs(self, doi: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/crossref_with_refs", params=dict(doi=f"eq.{doi}") + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_grobid_refs(self, source: str, source_id: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/grobid_refs", + params=dict(source=f"eq.{source}", source_id=f"eq.{source_id}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + +class SandcrawlerPostgresClient: + def __init__(self, db_url: str, **kwargs): + self.conn = psycopg2.connect(db_url) + + def cursor(self) -> psycopg2.extensions.cursor: + return self.conn.cursor() + + def commit(self) -> None: + self.conn.commit() + + def _inserts_and_updates(self, resp: List[Tuple], on_conflict: str) -> Tuple[int, int]: + resp_codes = [int(r[0]) for r in resp] + inserts = len([r for r in resp_codes if r == 0]) + if on_conflict == "update": + updates = len([r for r in resp_codes if r != 0]) + else: + updates = 0 + return (inserts, updates) + + def insert_cdx( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) + VALUES %s + ON CONFLICT ON CONSTRAINT cdx_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + + batch = [d for d in batch if d.get("warc_path")] + if not batch: + return (0, 0) + rows = [ + ( + d["url"], + d["datetime"], + d["sha1hex"], + d["mimetype"], + d["warc_path"], + int(d["warc_csize"]), + int(d["warc_offset"]), + ) + for d in batch + ] + # filter out duplicate rows by key (url, datetime) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_file_meta( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + sha256hex=EXCLUDED.sha256hex, + md5hex=EXCLUDED.md5hex, + size_bytes=EXCLUDED.size_bytes, + mimetype=EXCLUDED.mimetype + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + rows = [ + (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"]) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_grobid( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + grobid_version=EXCLUDED.grobid_version, + status_code=EXCLUDED.status_code, + status=EXCLUDED.status, + fatcat_release=EXCLUDED.fatcat_release, + updated=EXCLUDED.updated, + metadata=EXCLUDED.metadata + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + for r in batch: + if r.get("metadata"): + # sometimes these are only in metadata; shouldn't pass through + # though (to save database space) + dupe_fields = ("fatcat_release", "grobid_version") + for k in dupe_fields: + if k not in r: + r[k] = r["metadata"].get(k) + r["metadata"].pop(k, None) + r["metadata"] = json.dumps(r["metadata"], sort_keys=True) + now = datetime.datetime.now() + rows = [ + ( + d["key"], + d.get("grobid_version") or None, + d["status_code"], + d["status"], + d.get("fatcat_release") or None, + d.get("updated") or now, + d.get("metadata") or None, + ) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_pdf_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: + """ + batch elements are expected to have .to_sql_tuple() method + """ + sql = """ + INSERT INTO + pdf_meta (sha1hex, updated, status, has_page0_thumbnail, page_count, word_count, page0_height, page0_width, permanent_id, pdf_created, pdf_version, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status=EXCLUDED.status, + has_page0_thumbnail=EXCLUDED.has_page0_thumbnail, + page_count=EXCLUDED.page_count, + word_count=EXCLUDED.word_count, + page0_height=EXCLUDED.page0_height, + page0_width=EXCLUDED.page0_width, + permanent_id=EXCLUDED.permanent_id, + pdf_created=EXCLUDED.pdf_created, + pdf_version=EXCLUDED.pdf_version, + metadata=EXCLUDED.metadata + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_html_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: + """ + batch elements are expected to have .to_sql_tuple() method + """ + sql = """ + INSERT INTO + html_meta (sha1hex, updated, status, scope, has_teixml, has_thumbnail, word_count, biblio, resources) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status=EXCLUDED.status, + scope=EXCLUDED.scope, + has_teixml=EXCLUDED.has_teixml, + has_thumbnail=EXCLUDED.has_thumbnail, + word_count=EXCLUDED.word_count, + biblio=EXCLUDED.biblio, + resources=EXCLUDED.resources + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_pdftrio( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + pdftrio (sha1hex, updated, status_code, status, pdftrio_version, + models_date, ensemble_score, bert_score, linear_score, + image_score) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status_code=EXCLUDED.status_code, + status=EXCLUDED.status, + pdftrio_version=EXCLUDED.pdftrio_version, + models_date=EXCLUDED.models_date, + ensemble_score=EXCLUDED.ensemble_score, + bert_score=EXCLUDED.bert_score, + linear_score=EXCLUDED.linear_score, + image_score=EXCLUDED.image_score + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + now = datetime.datetime.now() + rows = [ + ( + d["key"], + d.get("updated") or now, + d["status_code"], + d["status"], + d.get("versions", {}).get("pdftrio_version") or None, + d.get("versions", {}).get("models_date") or None, + d.get("ensemble_score"), + d.get("bert_score"), + d.get("linear_score"), + d.get("image_score"), + ) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_request( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_request_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + for r in batch: + # in case these fields were already packed into 'request' + extra = r.get("request", {}) + for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"): + if r.get(k): + extra[k] = r[k] + if extra: + r["extra"] = json.dumps(extra, sort_keys=True) + rows = [ + ( + d["link_source"], + d["link_source_id"], + d["ingest_type"], + d["base_url"], + d.get("ingest_request_source"), + d.get("release_stage") or None, + d.get("extra") or None, + ) + for d in batch + ] + # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1], b[2], b[3])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_file_result( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + terminal_url=EXCLUDED.terminal_url, + terminal_dt=EXCLUDED.terminal_dt, + terminal_status_code=EXCLUDED.terminal_status_code, + terminal_sha1hex=EXCLUDED.terminal_sha1hex + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("terminal_url"), + d.get("terminal_dt"), + d.get("terminal_status_code"), + d.get("terminal_sha1hex"), + ) + for d in batch + ] + # filter out duplicate rows by key (ingest_type, base_url) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_fileset_platform( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + platform_name=EXCLUDED.platform_name, + platform_domain=EXCLUDED.platform_domain, + platform_id=EXCLUDED.platform_id, + ingest_strategy=EXCLUDED.ingest_strategy, + total_size=EXCLUDED.total_size, + file_count=EXCLUDED.file_count, + archiveorg_item_name=EXCLUDED.archiveorg_item_name, + archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path, + web_bundle_url=EXCLUDED.web_bundle_url, + web_bundle_dt=EXCLUDED.web_bundle_dt, + manifest=EXCLUDED.manifest + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("platform_name"), + d.get("platform_domain"), + d.get("platform_id"), + d.get("ingest_strategy"), + d.get("total_size"), + d.get("file_count"), + d.get("archiveorg_item_name"), + d.get("archiveorg_item_bundle_path"), + d.get("web_bundle_url"), + d.get("web_bundle_dt"), + d.get("manifest"), + ) + for d in batch + ] + # filter out duplicate rows by key (ingest_type, base_url) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_crossref( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "update", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + crossref (doi, indexed, record) + VALUES %s + ON CONFLICT (doi) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + indexed=EXCLUDED.indexed, + record=EXCLUDED.record + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + rows = [ + ( + d["doi"], + d.get("indexed") or None, + json.dumps(d["record"], sort_keys=True), + ) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_grobid_refs( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "update", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + grobid_refs (source, source_id, source_ts, updated, refs_json) + VALUES %s + ON CONFLICT (source, source_id) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + source_ts=EXCLUDED.source_ts, + updated=EXCLUDED.updated, + refs_json=EXCLUDED.refs_json + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + now = datetime.datetime.now() + rows = [ + ( + d["source"], + d["source_id"], + d.get("source_ts") or None, + d.get("updated") or now, + json.dumps(d["refs_json"], sort_keys=True), + ) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) |