From 5dc1a8642077b67f3af0a41cdac851bb96a435b7 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 2 Jan 2020 18:01:04 -0800 Subject: db: move duplicate row filtering into DB insert helpers --- python/sandcrawler/db.py | 25 +++++++++++++++++++++++++ python/sandcrawler/persist.py | 16 +--------------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 21ac82a..eb1a922 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -78,6 +78,11 @@ class SandcrawlerPostgresClient: int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + # filter out duplicate rows by key (url, datetime) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -106,6 +111,11 @@ class SandcrawlerPostgresClient: int(d['size_bytes']), d['mimetype']) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -142,6 +152,11 @@ class SandcrawlerPostgresClient: d.get('metadata') or None , ) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -173,6 +188,11 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] + # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1], b[2], b[3])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -208,5 +228,10 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index c3f6b08..71ada51 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -156,22 +156,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not batch: return [] - # need to ensure that we aren't trying to update same row multiple - # times in same batch (!) results = [self.file_result_to_row(raw) for raw in batch] - results.reverse() - clean_results = [] - result_keys = [] - for r in results: - if not r: - continue - key = (r['ingest_type'], r['base_url']) - if key in result_keys: - self.counts['skip-duplicate-result'] += 1 - continue - result_keys.append(key) - clean_results.append(r) - results = clean_results + results = [r for r in results if r] requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')] requests = [r for r in requests if r] -- cgit v1.2.3