From 826c7538e091fac14d987a3cd654975da964e240 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 27 Oct 2021 18:50:17 -0700 Subject: make fmt (black 21.9b0) --- python/sandcrawler/db.py | 263 +++++++++++++++++++++++++++-------------------- 1 file changed, 151 insertions(+), 112 deletions(-) (limited to 'python/sandcrawler/db.py') diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index ee4d3bf..69d2116 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -12,12 +12,12 @@ class SandcrawlerPostgrestClient: self.api_url = api_url def get_cdx(self, url: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url)) + resp = requests.get(self.api_url + "/cdx", params=dict(url="eq." + url)) resp.raise_for_status() return resp.json() or None def get_grobid(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -26,7 +26,7 @@ class SandcrawlerPostgrestClient: return None def get_pdftrio(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -35,7 +35,7 @@ class SandcrawlerPostgrestClient: return None def get_pdf_meta(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -56,7 +56,7 @@ class SandcrawlerPostgrestClient: return None def get_file_meta(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -89,7 +89,7 @@ class SandcrawlerPostgrestClient: return None def get_crossref(self, doi: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi)) + resp = requests.get(self.api_url + "/crossref", params=dict(doi="eq." + doi)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -117,10 +117,12 @@ class SandcrawlerPostgresClient: updates = 0 return (inserts, updates) - def insert_cdx(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_cdx( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) @@ -133,11 +135,21 @@ class SandcrawlerPostgresClient: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d for d in batch if d.get('warc_path')] + batch = [d for d in batch if d.get("warc_path")] if not batch: return (0, 0) - rows = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'], - int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + rows = [ + ( + d["url"], + d["datetime"], + d["sha1hex"], + d["mimetype"], + d["warc_path"], + int(d["warc_csize"]), + int(d["warc_offset"]), + ) + for d in batch + ] # filter out duplicate rows by key (url, datetime) row_dict = dict() for b in rows: @@ -146,10 +158,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_file_meta(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_file_meta( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) @@ -168,8 +182,10 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype']) - for d in batch] + rows = [ + (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"]) + for d in batch + ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: @@ -178,10 +194,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_grobid(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_grobid( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) @@ -203,24 +221,27 @@ class SandcrawlerPostgresClient: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" for r in batch: - if r.get('metadata'): + if r.get("metadata"): # sometimes these are only in metadata; shouldn't pass through # though (to save database space) - dupe_fields = ('fatcat_release', 'grobid_version') + dupe_fields = ("fatcat_release", "grobid_version") for k in dupe_fields: if k not in r: - r[k] = r['metadata'].get(k) - r['metadata'].pop(k, None) - r['metadata'] = json.dumps(r['metadata'], sort_keys=True) - rows = [( - d['key'], - d.get('grobid_version') or None, - d['status_code'], - d['status'], - d.get('fatcat_release') or None, - d.get('updated') or datetime.datetime.now(), - d.get('metadata') or None, - ) for d in batch] + r[k] = r["metadata"].get(k) + r["metadata"].pop(k, None) + r["metadata"] = json.dumps(r["metadata"], sort_keys=True) + rows = [ + ( + d["key"], + d.get("grobid_version") or None, + d["status_code"], + d["status"], + d.get("fatcat_release") or None, + d.get("updated") or datetime.datetime.now(), + d.get("metadata") or None, + ) + for d in batch + ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: @@ -229,10 +250,9 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdf_meta(self, - cur: psycopg2.extensions.cursor, - rows: List[Tuple], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_pdf_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ @@ -269,10 +289,9 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_html_meta(self, - cur: psycopg2.extensions.cursor, - rows: List[Tuple], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_html_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ @@ -306,10 +325,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdftrio(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_pdftrio( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO pdftrio (sha1hex, updated, status_code, status, pdftrio_version, @@ -335,18 +356,21 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [( - d['key'], - d.get('updated') or datetime.datetime.now(), - d['status_code'], - d['status'], - d.get('versions', {}).get('pdftrio_version') or None, - d.get('versions', {}).get('models_date') or None, - d.get('ensemble_score'), - d.get('bert_score'), - d.get('linear_score'), - d.get('image_score'), - ) for d in batch] + rows = [ + ( + d["key"], + d.get("updated") or datetime.datetime.now(), + d["status_code"], + d["status"], + d.get("versions", {}).get("pdftrio_version") or None, + d.get("versions", {}).get("models_date") or None, + d.get("ensemble_score"), + d.get("bert_score"), + d.get("linear_score"), + d.get("image_score"), + ) + for d in batch + ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: @@ -355,10 +379,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_request(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_ingest_request( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) @@ -372,21 +398,24 @@ class SandcrawlerPostgresClient: sql += " RETURNING xmax;" for r in batch: # in case these fields were already packed into 'request' - extra = r.get('request', {}) - for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'): + extra = r.get("request", {}) + for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"): if r.get(k): extra[k] = r[k] if extra: - r['extra'] = json.dumps(extra, sort_keys=True) - rows = [( - d['link_source'], - d['link_source_id'], - d['ingest_type'], - d['base_url'], - d.get('ingest_request_source'), - d.get('release_stage') or None, - d.get('extra') or None, - ) for d in batch] + r["extra"] = json.dumps(extra, sort_keys=True) + rows = [ + ( + d["link_source"], + d["link_source_id"], + d["ingest_type"], + d["base_url"], + d.get("ingest_request_source"), + d.get("release_stage") or None, + d.get("extra") or None, + ) + for d in batch + ] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) row_dict = dict() for b in rows: @@ -395,10 +424,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_file_result(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_ingest_file_result( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) @@ -420,16 +451,19 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [( - d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('terminal_url'), - d.get('terminal_dt'), - d.get('terminal_status_code'), - d.get('terminal_sha1hex'), - ) for d in batch] + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("terminal_url"), + d.get("terminal_dt"), + d.get("terminal_status_code"), + d.get("terminal_sha1hex"), + ) + for d in batch + ] # filter out duplicate rows by key (ingest_type, base_url) row_dict = dict() for b in rows: @@ -438,10 +472,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_fileset_platform(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_ingest_fileset_platform( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) @@ -470,23 +506,26 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [( - d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('platform_name'), - d.get('platform_domain'), - d.get('platform_id'), - d.get('ingest_strategy'), - d.get('total_size'), - d.get('file_count'), - d.get('archiveorg_item_name'), - d.get('archiveorg_item_bundle_path'), - d.get('web_bundle_url'), - d.get('web_bundle_dt'), - d.get('manifest'), - ) for d in batch] + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("platform_name"), + d.get("platform_domain"), + d.get("platform_id"), + d.get("ingest_strategy"), + d.get("total_size"), + d.get("file_count"), + d.get("archiveorg_item_name"), + d.get("archiveorg_item_bundle_path"), + d.get("web_bundle_url"), + d.get("web_bundle_dt"), + d.get("manifest"), + ) + for d in batch + ] # filter out duplicate rows by key (ingest_type, base_url) row_dict = dict() for b in rows: -- cgit v1.2.3