diff options
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r-- | python/sandcrawler/db.py | 549 |
1 files changed, 395 insertions, 154 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 066e53b..f9018ec 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -1,51 +1,58 @@ - -import json import datetime -from typing import Optional +import json +from typing import Any, Dict, List, Optional, Tuple import psycopg2 import psycopg2.extras -import requests -class SandcrawlerPostgrestClient: +from .misc import requests_retry_session - def __init__(self, api_url="http://aitio.us.archive.org:3030", **kwargs): + +class SandcrawlerPostgrestClient: + def __init__(self, api_url: str = "http://wbgrp-svc506.us.archive.org:3030", **kwargs): self.api_url = api_url + self.http_session = requests_retry_session() - def get_cdx(self, url): - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + def get_cdx(self, url: str) -> Optional[dict]: + resp = self.http_session.get(self.api_url + "/cdx", params=dict(url="eq." + url)) resp.raise_for_status() return resp.json() or None - def get_grobid(self, sha1): - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + def get_grobid(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1) + ) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None - def get_pdftrio(self, sha1): - resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1)) + def get_pdftrio(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1) + ) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None - def get_pdf_meta(self, sha1): - resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1)) + def get_pdf_meta(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1) + ) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None def get_html_meta(self, sha1hex: str) -> Optional[dict]: - resp = requests.get( + resp = self.http_session.get( self.api_url + "/html_meta", params=dict(sha1hex=f"eq.{sha1hex}"), ) @@ -56,17 +63,19 @@ class SandcrawlerPostgrestClient: else: return None - def get_file_meta(self, sha1): - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + def get_file_meta(self, sha1: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1) + ) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]: - resp = requests.get( + resp = self.http_session.get( self.api_url + "/ingest_file_result", params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), ) @@ -77,27 +86,76 @@ class SandcrawlerPostgrestClient: else: return None -class SandcrawlerPostgresClient: + def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/ingest_fileset_platform", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_crossref(self, doi: str) -> Optional[dict]: + resp = self.http_session.get(self.api_url + "/crossref", params=dict(doi=f"eq.{doi}")) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_crossref_with_refs(self, doi: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/crossref_with_refs", params=dict(doi=f"eq.{doi}") + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None - def __init__(self, db_url, **kwargs): + def get_grobid_refs(self, source: str, source_id: str) -> Optional[dict]: + resp = self.http_session.get( + self.api_url + "/grobid_refs", + params=dict(source=f"eq.{source}", source_id=f"eq.{source_id}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + +class SandcrawlerPostgresClient: + def __init__(self, db_url: str, **kwargs): self.conn = psycopg2.connect(db_url) - def cursor(self): + def cursor(self) -> psycopg2.extensions.cursor: return self.conn.cursor() - def commit(self): - return self.conn.commit() + def commit(self) -> None: + self.conn.commit() - def _inserts_and_updates(self, resp, on_conflict): - resp = [int(r[0]) for r in resp] - inserts = len([r for r in resp if r == 0]) + def _inserts_and_updates(self, resp: List[Tuple], on_conflict: str) -> Tuple[int, int]: + resp_codes = [int(r[0]) for r in resp] + inserts = len([r for r in resp_codes if r == 0]) if on_conflict == "update": - updates = len([r for r in resp if r != 0]) + updates = len([r for r in resp_codes if r != 0]) else: updates = 0 return (inserts, updates) - def insert_cdx(self, cur, batch, on_conflict="nothing"): + def insert_cdx( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) @@ -110,26 +168,35 @@ class SandcrawlerPostgresClient: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d for d in batch if d.get('warc_path')] + batch = [d for d in batch if d.get("warc_path")] if not batch: return (0, 0) - batch = [(d['url'], - d['datetime'], - d['sha1hex'], - d['mimetype'], - d['warc_path'], - int(d['warc_csize']), - int(d['warc_offset'])) - for d in batch] + rows = [ + ( + d["url"], + d["datetime"], + d["sha1hex"], + d["mimetype"], + d["warc_path"], + int(d["warc_csize"]), + int(d["warc_offset"]), + ) + for d in batch + ] # filter out duplicate rows by key (url, datetime) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_file_meta(self, cur, batch, on_conflict="nothing"): + def insert_file_meta( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) @@ -148,21 +215,24 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['sha1hex'], - d['sha256hex'], - d['md5hex'], - int(d['size_bytes']), - d['mimetype']) - for d in batch] + rows = [ + (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"]) + for d in batch + ] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_grobid(self, cur, batch, on_conflict="nothing"): + def insert_grobid( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) @@ -184,33 +254,39 @@ class SandcrawlerPostgresClient: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" for r in batch: - if r.get('metadata'): + if r.get("metadata"): # sometimes these are only in metadata; shouldn't pass through # though (to save database space) - dupe_fields = ('fatcat_release', 'grobid_version') + dupe_fields = ("fatcat_release", "grobid_version") for k in dupe_fields: - if not k in r: - r[k] = r['metadata'].get(k) - r['metadata'].pop(k, None) - r['metadata'] = json.dumps(r['metadata'], sort_keys=True) - batch = [(d['key'], - d.get('grobid_version') or None, - d['status_code'], - d['status'], - d.get('fatcat_release') or None, - d.get('updated') or datetime.datetime.now(), - d.get('metadata') or None , - ) - for d in batch] + if k not in r: + r[k] = r["metadata"].get(k) + r["metadata"].pop(k, None) + r["metadata"] = json.dumps(r["metadata"], sort_keys=True) + now = datetime.datetime.now() + rows = [ + ( + d["key"], + d.get("grobid_version") or None, + d["status_code"], + d["status"], + d.get("fatcat_release") or None, + d.get("updated") or now, + d.get("metadata") or None, + ) + for d in batch + ] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdf_meta(self, cur, batch, on_conflict="nothing"): + def insert_pdf_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ @@ -239,16 +315,17 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d.to_sql_tuple() for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_html_meta(self, cur, batch, on_conflict="nothing"): + def insert_html_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ @@ -274,16 +351,20 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d.to_sql_tuple() for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdftrio(self, cur, batch, on_conflict="nothing"): + def insert_pdftrio( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO pdftrio (sha1hex, updated, status_code, status, pdftrio_version, @@ -309,29 +390,36 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [ + now = datetime.datetime.now() + rows = [ ( - d['key'], - d.get('updated') or datetime.datetime.now(), - d['status_code'], - d['status'], - d.get('versions', {}).get('pdftrio_version') or None, - d.get('versions', {}).get('models_date') or None, - d.get('ensemble_score'), - d.get('bert_score'), - d.get('linear_score'), - d.get('image_score'), + d["key"], + d.get("updated") or now, + d["status_code"], + d["status"], + d.get("versions", {}).get("pdftrio_version") or None, + d.get("versions", {}).get("models_date") or None, + d.get("ensemble_score"), + d.get("bert_score"), + d.get("linear_score"), + d.get("image_score"), ) - for d in batch] + for d in batch + ] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_request(self, cur, batch, on_conflict="nothing"): + def insert_ingest_request( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) @@ -345,35 +433,43 @@ class SandcrawlerPostgresClient: sql += " RETURNING xmax;" for r in batch: # in case these fields were already packed into 'request' - extra = r.get('request', {}) - for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'): + extra = r.get("request", {}) + for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"): if r.get(k): extra[k] = r[k] if extra: - r['extra'] = json.dumps(extra, sort_keys=True) - batch = [(d['link_source'], - d['link_source_id'], - d['ingest_type'], - d['base_url'], - d.get('ingest_request_source'), - d.get('release_stage') or None, - d.get('extra') or None, - ) - for d in batch] + r["extra"] = json.dumps(extra, sort_keys=True) + rows = [ + ( + d["link_source"], + d["link_source_id"], + d["ingest_type"], + d["base_url"], + d.get("ingest_request_source"), + d.get("release_stage") or None, + d.get("extra") or None, + ) + for d in batch + ] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1], b[2], b[3])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1], b[2], b[3])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"): + def insert_ingest_file_result( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) VALUES %s - ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO + ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" @@ -390,20 +486,165 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('terminal_url'), - d.get('terminal_dt'), - d.get('terminal_status_code'), - d.get('terminal_sha1hex'), - ) - for d in batch] + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("terminal_url"), + d.get("terminal_dt"), + d.get("terminal_status_code"), + d.get("terminal_sha1hex"), + ) + for d in batch + ] + # filter out duplicate rows by key (ingest_type, base_url) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_fileset_platform( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + platform_name=EXCLUDED.platform_name, + platform_domain=EXCLUDED.platform_domain, + platform_id=EXCLUDED.platform_id, + ingest_strategy=EXCLUDED.ingest_strategy, + total_size=EXCLUDED.total_size, + file_count=EXCLUDED.file_count, + archiveorg_item_name=EXCLUDED.archiveorg_item_name, + archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path, + web_bundle_url=EXCLUDED.web_bundle_url, + web_bundle_dt=EXCLUDED.web_bundle_dt, + manifest=EXCLUDED.manifest + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("platform_name"), + d.get("platform_domain"), + d.get("platform_id"), + d.get("ingest_strategy"), + d.get("total_size"), + d.get("file_count"), + d.get("archiveorg_item_name"), + d.get("archiveorg_item_bundle_path"), + d.get("web_bundle_url"), + d.get("web_bundle_dt"), + d.get("manifest"), + ) + for d in batch + ] # filter out duplicate rows by key (ingest_type, base_url) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_crossref( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "update", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + crossref (doi, indexed, record) + VALUES %s + ON CONFLICT (doi) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + indexed=EXCLUDED.indexed, + record=EXCLUDED.record + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + rows = [ + ( + d["doi"], + d.get("indexed") or None, + json.dumps(d["record"], sort_keys=True), + ) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_grobid_refs( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "update", + ) -> Tuple[int, int]: + sql = """ + INSERT INTO + grobid_refs (source, source_id, source_ts, updated, refs_json) + VALUES %s + ON CONFLICT (source, source_id) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + source_ts=EXCLUDED.source_ts, + updated=EXCLUDED.updated, + refs_json=EXCLUDED.refs_json + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + now = datetime.datetime.now() + rows = [ + ( + d["source"], + d["source_id"], + d.get("source_ts") or None, + d.get("updated") or now, + json.dumps(d["refs_json"], sort_keys=True), + ) + for d in batch + ] + # filter out duplicate rows by key (sha1hex) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) |