diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-10-26 14:42:36 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-10-26 14:42:36 -0700 |
commit | 505bd30c6517f1544e0e0c5ab59515c2d3f73562 (patch) | |
tree | f0b6f55d426b68a7857326e0f1cdb317c657d72e /python | |
parent | 326bbc56cb01b87d58307785d2b2a6df09c0e3c2 (diff) | |
download | sandcrawler-505bd30c6517f1544e0e0c5ab59515c2d3f73562.tar.gz sandcrawler-505bd30c6517f1544e0e0c5ab59515c2d3f73562.zip |
start adding python type annotations to db and persist code
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/db.py | 215 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 6 |
2 files changed, 124 insertions, 97 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 360add9..3ca2657 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -1,6 +1,6 @@ import datetime import json -from typing import Optional +from typing import Any, Dict, List, Optional, Tuple import psycopg2 import psycopg2.extras @@ -8,38 +8,38 @@ import requests class SandcrawlerPostgrestClient: - def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs): + def __init__(self, api_url: str = "http://wbgrp-svc506.us.archive.org:3030", **kwargs): self.api_url = api_url - def get_cdx(self, url): + def get_cdx(self, url: str) -> Optional[dict]: resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url)) resp.raise_for_status() return resp.json() or None - def get_grobid(self, sha1): + def get_grobid(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None - def get_pdftrio(self, sha1): + def get_pdftrio(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None - def get_pdf_meta(self, sha1): + def get_pdf_meta(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None @@ -55,12 +55,12 @@ class SandcrawlerPostgrestClient: else: return None - def get_file_meta(self, sha1): + def get_file_meta(self, sha1: str) -> Optional[dict]: resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None @@ -88,12 +88,12 @@ class SandcrawlerPostgrestClient: else: return None - def get_crossref(self, doi): + def get_crossref(self, doi: str) -> Optional[dict]: resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi)) resp.raise_for_status() - resp = resp.json() - if resp: - return resp[0] + resp_json = resp.json() + if resp_json: + return resp_json[0] else: return None @@ -102,22 +102,25 @@ class SandcrawlerPostgresClient: def __init__(self, db_url, **kwargs): self.conn = psycopg2.connect(db_url) - def cursor(self): + def cursor(self) -> psycopg2.extensions.cursor: return self.conn.cursor() - def commit(self): - return self.conn.commit() + def commit(self) -> None: + self.conn.commit() - def _inserts_and_updates(self, resp, on_conflict): - resp = [int(r[0]) for r in resp] - inserts = len([r for r in resp if r == 0]) + def _inserts_and_updates(self, resp: List[Tuple[Any]], on_conflict: str): + resp_codes = [int(r[0]) for r in resp] + inserts = len([r for r in resp_codes if r == 0]) if on_conflict == "update": - updates = len([r for r in resp if r != 0]) + updates = len([r for r in resp_codes if r != 0]) else: updates = 0 return (inserts, updates) - def insert_cdx(self, cur, batch, on_conflict="nothing"): + def insert_cdx(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) @@ -133,17 +136,20 @@ class SandcrawlerPostgresClient: batch = [d for d in batch if d.get('warc_path')] if not batch: return (0, 0) - batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'], - int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + rows = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'], + int(d['warc_csize']), int(d['warc_offset'])) for d in batch] # filter out duplicate rows by key (url, datetime) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_file_meta(self, cur, batch, on_conflict="nothing"): + def insert_file_meta(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) @@ -162,17 +168,20 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), - d['mimetype']) for d in batch] + rows = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype']) + for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_grobid(self, cur, batch, on_conflict="nothing"): + def insert_grobid(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) @@ -203,7 +212,7 @@ class SandcrawlerPostgresClient: r[k] = r['metadata'].get(k) r['metadata'].pop(k, None) r['metadata'] = json.dumps(r['metadata'], sort_keys=True) - batch = [( + rows = [( d['key'], d.get('grobid_version') or None, d['status_code'], @@ -213,14 +222,17 @@ class SandcrawlerPostgresClient: d.get('metadata') or None, ) for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdf_meta(self, cur, batch, on_conflict="nothing"): + def insert_pdf_meta(self, + cur: psycopg2.extensions.cursor, + rows: List[Tuple[Any]], + on_conflict: str = "nothing"): """ batch elements are expected to have .to_sql_tuple() method """ @@ -249,16 +261,18 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d.to_sql_tuple() for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_html_meta(self, cur, batch, on_conflict="nothing"): + def insert_html_meta(self, + cur: psycopg2.extensions.cursor, + rows: List[Tuple[Any]], + on_conflict: str = "nothing"): """ batch elements are expected to have .to_sql_tuple() method """ @@ -284,16 +298,18 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d.to_sql_tuple() for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdftrio(self, cur, batch, on_conflict="nothing"): + def insert_pdftrio(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO pdftrio (sha1hex, updated, status_code, status, pdftrio_version, @@ -319,7 +335,7 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [( + rows = [( d['key'], d.get('updated') or datetime.datetime.now(), d['status_code'], @@ -332,14 +348,17 @@ class SandcrawlerPostgresClient: d.get('image_score'), ) for d in batch] # filter out duplicate rows by key (sha1hex) - batch_dict = dict() - for b in batch: - batch_dict[b[0]] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[b[0]] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_request(self, cur, batch, on_conflict="nothing"): + def insert_ingest_request(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) @@ -359,7 +378,7 @@ class SandcrawlerPostgresClient: extra[k] = r[k] if extra: r['extra'] = json.dumps(extra, sort_keys=True) - batch = [( + rows = [( d['link_source'], d['link_source_id'], d['ingest_type'], @@ -369,14 +388,17 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1], b[2], b[3])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1], b[2], b[3])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"): + def insert_ingest_file_result(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) @@ -398,7 +420,7 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [( + rows = [( d['ingest_type'], d['base_url'], bool(d['hit']), @@ -409,14 +431,17 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] # filter out duplicate rows by key (ingest_type, base_url) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_fileset_platform(self, cur, batch, on_conflict="nothing"): + def insert_ingest_fileset_platform(self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing"): sql = """ INSERT INTO ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) @@ -445,7 +470,7 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [( + rows = [( d['ingest_type'], d['base_url'], bool(d['hit']), @@ -463,9 +488,9 @@ class SandcrawlerPostgresClient: d.get('manifest'), ) for d in batch] # filter out duplicate rows by key (ingest_type, base_url) - batch_dict = dict() - for b in batch: - batch_dict[(b[0], b[1])] = b - batch = list(batch_dict.values()) - resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + row_dict = dict() + for b in rows: + row_dict[(b[0], b[1])] = b + rows = list(row_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index b714bc7..bb76e54 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -266,7 +266,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body') ] if html_meta_batch: - resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update") + rows = [d.to_sql_tuple() for d in html_meta_batch] + resp = self.db.insert_html_meta(self.cur, rows, on_conflict="update") self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] @@ -534,7 +535,8 @@ class PersistPdfTextWorker(SandcrawlerWorker): self.counts['s3-put'] += 1 if not self.s3_only: - resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update") + rows = [r.to_sql_tuple() for r in parsed_batch] + resp = self.db.insert_pdf_meta(self.cur, rows, on_conflict="update") self.counts['insert-pdf-meta'] += resp[0] self.counts['update-pdf-meta'] += resp[1] |