aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r--python/sandcrawler/db.py45
1 files changed, 30 insertions, 15 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index adf6967..0054e07 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -45,6 +45,14 @@ class SandcrawlerPostgresClient:
def commit(self):
return self.conn.commit()
+ def _inserts_and_updates(self, resp, on_conflict):
+ inserts = len([r for r in resp if r == 0])
+ if on_conflict == "update":
+ updates = len([r for r in resp if r != 0])
+ else:
+ updates = 0
+ return (inserts, updates)
+
def insert_cdx(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
@@ -56,7 +64,7 @@ class SandcrawlerPostgresClient:
sql += " NOTHING"
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
batch = [d for d in batch if d.get('warc_offset')]
if not batch:
@@ -69,8 +77,8 @@ class SandcrawlerPostgresClient:
d['warc_csize'],
d['warc_offset'])
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_file_meta(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -81,17 +89,24 @@ class SandcrawlerPostgresClient:
"""
if on_conflict.lower() == "nothing":
sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ sha256hex=EXCLUDED.sha256hex,
+ md5hex=EXCLUDED.md5hex,
+ size_bytes=EXCLUDED.size_bytes,
+ mimetype=EXCLUDED.mimetype
+ """
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
batch = [(d['sha1hex'],
d['sha256hex'],
d['md5hex'],
int(d['size_bytes']),
d['mimetype'])
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_grobid(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -113,7 +128,7 @@ class SandcrawlerPostgresClient:
"""
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
for r in batch:
if r.get('metadata'):
r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
@@ -126,8 +141,8 @@ class SandcrawlerPostgresClient:
d.get('metadata') or None ,
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -140,7 +155,7 @@ class SandcrawlerPostgresClient:
sql += " NOTHING"
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
for r in batch:
extra = dict()
for k in ('ext_ids', 'fatcat_release', 'edit_extra'):
@@ -157,8 +172,8 @@ class SandcrawlerPostgresClient:
d.get('extra') or None,
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -181,7 +196,7 @@ class SandcrawlerPostgresClient:
"""
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
batch = [(d['ingest_type'],
d['base_url'],
bool(d['hit']),
@@ -192,5 +207,5 @@ class SandcrawlerPostgresClient:
d.get('terminal_sha1hex'),
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)