From 0756b3901e48844b4c482ef43c409699497ec3b9 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 19:21:33 -0800 Subject: improve DB helpers - return insert/update row counts - implement ON CONFLICT ... DO UPDATE on some tables --- python/sandcrawler/db.py | 107 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 81 insertions(+), 26 deletions(-) diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 104920b..adf6967 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -1,5 +1,7 @@ import json +import datetime + import psycopg2 import psycopg2.extras import requests @@ -43,14 +45,19 @@ class SandcrawlerPostgresClient: def commit(self): return self.conn.commit() - def insert_cdx(self, cur, batch, on_conflict="NOTHING"): + def insert_cdx(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) VALUES %s - ON CONFLICT ON CONSTRAINT cdx_pkey DO {} - RETURNING 1; + ON CONFLICT ON CONSTRAINT cdx_pkey DO """.format(on_conflict) + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING 1;" + batch = [d for d in batch if d.get('warc_offset')] if not batch: return 0 @@ -62,31 +69,51 @@ class SandcrawlerPostgresClient: d['warc_csize'], d['warc_offset']) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True - #return len(res) + res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return len(res) - def insert_file_meta(self, cur, batch, on_conflict="NOTHING"): + def insert_file_meta(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) VALUES %s - ON CONFLICT (sha1hex) DO {}; - """.format(on_conflict) + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING 1;" batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype']) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) + res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return len(res) - def insert_grobid(self, cur, batch, on_conflict="NOTHING"): # XXX + def insert_grobid(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO - grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata) + grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) VALUES %s - ON CONFLICT (sha1hex) DO {}; - """.format(on_conflict) + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + grobid_version=EXCLUDED.grobid_version, + status_code=EXCLUDED.status_code, + status=EXCLUDED.status, + fatcat_release=EXCLUDED.fatcat_release, + updated=EXCLUDED.updated, + metadata=EXCLUDED.metadata + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING 1;" for r in batch: if r.get('metadata'): r['metadata'] = json.dumps(r['metadata'], sort_keys=True) @@ -95,39 +122,66 @@ class SandcrawlerPostgresClient: d['status_code'], d['status'], d.get('fatcat_release') or None, + d.get('updated') or datetime.datetime.now(), d.get('metadata') or None , ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) + res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return len(res) - def insert_ingest_request(self, cur, batch, on_conflict="NOTHING"): + def insert_ingest_request(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO - ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request, metadata) + ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) VALUES %s - ON CONFLICT ON CONSTRAINT ingest_request_pkey DO {}; - """.format(on_conflict) + ON CONFLICT ON CONSTRAINT ingest_request_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING 1;" for r in batch: - if r.get('metadata'): - r['metadata'] = json.dumps(r['metadata'], sort_keys=True) + extra = dict() + for k in ('ext_ids', 'fatcat_release', 'edit_extra'): + if r.get(k): + extra[k] = r[k] + if extra: + r['extra'] = json.dumps(extra, sort_keys=True) batch = [(d['link_source'], d['link_source_id'], d['ingest_type'], d['base_url'], d.get('ingest_request_source'), d.get('release_stage') or None, - d.get('request') or None, + d.get('extra') or None, ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) + res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return len(res) - def insert_ingest_file_result(self, cur, batch, on_conflict="NOTHING"): + def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) VALUES %s - ON CONFLICT DO {}; - """.format(on_conflict) + ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + terminal_url=EXCLUDED.terminal_url, + terminal_dt=EXCLUDED.terminal_dt, + terminal_status_code=EXCLUDED.terminal_status_code, + terminal_sha1hex=EXCLUDED.terminal_sha1hex + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING 1;" batch = [(d['ingest_type'], d['base_url'], bool(d['hit']), @@ -138,4 +192,5 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) + res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return len(res) -- cgit v1.2.3