From 5dc1a8642077b67f3af0a41cdac851bb96a435b7 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 2 Jan 2020 18:01:04 -0800 Subject: db: move duplicate row filtering into DB insert helpers --- python/sandcrawler/db.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'python/sandcrawler/db.py') diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 21ac82a..eb1a922 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -78,6 +78,11 @@ class SandcrawlerPostgresClient: int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + # filter out duplicate rows by key (url, datetime) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -106,6 +111,11 @@ class SandcrawlerPostgresClient: int(d['size_bytes']), d['mimetype']) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -142,6 +152,11 @@ class SandcrawlerPostgresClient: d.get('metadata') or None , ) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -173,6 +188,11 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] + # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1], b[2], b[3])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -208,5 +228,10 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) -- cgit v1.2.3