From beba257030d84af2f80c09ec695a35a733a2322d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 21:18:35 -0800 Subject: db: fancy insert/update separation using postgres xmax --- python/sandcrawler/db.py | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) (limited to 'python/sandcrawler/db.py') diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index adf6967..0054e07 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -45,6 +45,14 @@ class SandcrawlerPostgresClient: def commit(self): return self.conn.commit() + def _inserts_and_updates(self, resp, on_conflict): + inserts = len([r for r in resp if r == 0]) + if on_conflict == "update": + updates = len([r for r in resp if r != 0]) + else: + updates = 0 + return (inserts, updates) + def insert_cdx(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO @@ -56,7 +64,7 @@ class SandcrawlerPostgresClient: sql += " NOTHING" else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" batch = [d for d in batch if d.get('warc_offset')] if not batch: @@ -69,8 +77,8 @@ class SandcrawlerPostgresClient: d['warc_csize'], d['warc_offset']) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_file_meta(self, cur, batch, on_conflict="nothing"): sql = """ @@ -81,17 +89,24 @@ class SandcrawlerPostgresClient: """ if on_conflict.lower() == "nothing": sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + sha256hex=EXCLUDED.sha256hex, + md5hex=EXCLUDED.md5hex, + size_bytes=EXCLUDED.size_bytes, + mimetype=EXCLUDED.mimetype + """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype']) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_grobid(self, cur, batch, on_conflict="nothing"): sql = """ @@ -113,7 +128,7 @@ class SandcrawlerPostgresClient: """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" for r in batch: if r.get('metadata'): r['metadata'] = json.dumps(r['metadata'], sort_keys=True) @@ -126,8 +141,8 @@ class SandcrawlerPostgresClient: d.get('metadata') or None , ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_ingest_request(self, cur, batch, on_conflict="nothing"): sql = """ @@ -140,7 +155,7 @@ class SandcrawlerPostgresClient: sql += " NOTHING" else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" for r in batch: extra = dict() for k in ('ext_ids', 'fatcat_release', 'edit_extra'): @@ -157,8 +172,8 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"): sql = """ @@ -181,7 +196,7 @@ class SandcrawlerPostgresClient: """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" batch = [(d['ingest_type'], d['base_url'], bool(d['hit']), @@ -192,5 +207,5 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) -- cgit v1.2.3