aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-02 18:01:04 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:59 -0800
commit5dc1a8642077b67f3af0a41cdac851bb96a435b7 (patch)
treec40a525c089db607e86a1c14256e0703531d4024
parent6093c9a0c9b65cdf790f200395e2d44d4fe6278b (diff)
downloadsandcrawler-5dc1a8642077b67f3af0a41cdac851bb96a435b7.tar.gz
sandcrawler-5dc1a8642077b67f3af0a41cdac851bb96a435b7.zip
db: move duplicate row filtering into DB insert helpers
-rw-r--r--python/sandcrawler/db.py25
-rw-r--r--python/sandcrawler/persist.py16
2 files changed, 26 insertions, 15 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 21ac82a..eb1a922 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -78,6 +78,11 @@ class SandcrawlerPostgresClient:
int(d['warc_csize']),
int(d['warc_offset']))
for d in batch]
+ # filter out duplicate rows by key (url, datetime)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1])] = b
+ batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
@@ -106,6 +111,11 @@ class SandcrawlerPostgresClient:
int(d['size_bytes']),
d['mimetype'])
for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
@@ -142,6 +152,11 @@ class SandcrawlerPostgresClient:
d.get('metadata') or None ,
)
for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
@@ -173,6 +188,11 @@ class SandcrawlerPostgresClient:
d.get('extra') or None,
)
for d in batch]
+ # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1], b[2], b[3])] = b
+ batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
@@ -208,5 +228,10 @@ class SandcrawlerPostgresClient:
d.get('terminal_sha1hex'),
)
for d in batch]
+ # filter out duplicate rows by key (ingest_type, base_url)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1])] = b
+ batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index c3f6b08..71ada51 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -156,22 +156,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not batch:
return []
- # need to ensure that we aren't trying to update same row multiple
- # times in same batch (!)
results = [self.file_result_to_row(raw) for raw in batch]
- results.reverse()
- clean_results = []
- result_keys = []
- for r in results:
- if not r:
- continue
- key = (r['ingest_type'], r['base_url'])
- if key in result_keys:
- self.counts['skip-duplicate-result'] += 1
- continue
- result_keys.append(key)
- clean_results.append(r)
- results = clean_results
+ results = [r for r in results if r]
requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')]
requests = [r for r in requests if r]