diff options
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r-- | python/sandcrawler/db.py | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 21ac82a..eb1a922 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -78,6 +78,11 @@ class SandcrawlerPostgresClient: int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + # filter out duplicate rows by key (url, datetime) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -106,6 +111,11 @@ class SandcrawlerPostgresClient: int(d['size_bytes']), d['mimetype']) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -142,6 +152,11 @@ class SandcrawlerPostgresClient: d.get('metadata') or None , ) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -173,6 +188,11 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] + # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1], b[2], b[3])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -208,5 +228,10 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) |