diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-01-02 18:01:04 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-02 18:12:59 -0800 |
commit | 5dc1a8642077b67f3af0a41cdac851bb96a435b7 (patch) | |
tree | c40a525c089db607e86a1c14256e0703531d4024 /python | |
parent | 6093c9a0c9b65cdf790f200395e2d44d4fe6278b (diff) | |
download | sandcrawler-5dc1a8642077b67f3af0a41cdac851bb96a435b7.tar.gz sandcrawler-5dc1a8642077b67f3af0a41cdac851bb96a435b7.zip |
db: move duplicate row filtering into DB insert helpers
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/db.py | 25 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 16 |
2 files changed, 26 insertions, 15 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 21ac82a..eb1a922 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -78,6 +78,11 @@ class SandcrawlerPostgresClient: int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + # filter out duplicate rows by key (url, datetime) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -106,6 +111,11 @@ class SandcrawlerPostgresClient: int(d['size_bytes']), d['mimetype']) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -142,6 +152,11 @@ class SandcrawlerPostgresClient: d.get('metadata') or None , ) for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -173,6 +188,11 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] + # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1], b[2], b[3])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) @@ -208,5 +228,10 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index c3f6b08..71ada51 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -156,22 +156,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not batch: return [] - # need to ensure that we aren't trying to update same row multiple - # times in same batch (!) results = [self.file_result_to_row(raw) for raw in batch] - results.reverse() - clean_results = [] - result_keys = [] - for r in results: - if not r: - continue - key = (r['ingest_type'], r['base_url']) - if key in result_keys: - self.counts['skip-duplicate-result'] += 1 - continue - result_keys.append(key) - clean_results.append(r) - results = clean_results + results = [r for r in results if r] requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')] requests = [r for r in requests if r] |