aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-12-26 19:21:33 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:58 -0800
commit0756b3901e48844b4c482ef43c409699497ec3b9 (patch)
tree77a1182861e2b0c9b3975b02e324752258c45f9e
parent64d8a0e64b0b4f5d1c5927c7a45317f5bc65a421 (diff)
downloadsandcrawler-0756b3901e48844b4c482ef43c409699497ec3b9.tar.gz
sandcrawler-0756b3901e48844b4c482ef43c409699497ec3b9.zip
improve DB helpers
- return insert/update row counts - implement ON CONFLICT ... DO UPDATE on some tables
-rw-r--r--python/sandcrawler/db.py107
1 files changed, 81 insertions, 26 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 104920b..adf6967 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -1,5 +1,7 @@
import json
+import datetime
+
import psycopg2
import psycopg2.extras
import requests
@@ -43,14 +45,19 @@ class SandcrawlerPostgresClient:
def commit(self):
return self.conn.commit()
- def insert_cdx(self, cur, batch, on_conflict="NOTHING"):
+ def insert_cdx(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
VALUES %s
- ON CONFLICT ON CONSTRAINT cdx_pkey DO {}
- RETURNING 1;
+ ON CONFLICT ON CONSTRAINT cdx_pkey DO
""".format(on_conflict)
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING 1;"
+
batch = [d for d in batch if d.get('warc_offset')]
if not batch:
return 0
@@ -62,31 +69,51 @@ class SandcrawlerPostgresClient:
d['warc_csize'],
d['warc_offset'])
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True
- #return len(res)
+ res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return len(res)
- def insert_file_meta(self, cur, batch, on_conflict="NOTHING"):
+ def insert_file_meta(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
VALUES %s
- ON CONFLICT (sha1hex) DO {};
- """.format(on_conflict)
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING 1;"
batch = [(d['sha1hex'],
d['sha256hex'],
d['md5hex'],
int(d['size_bytes']),
d['mimetype'])
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch)
+ res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return len(res)
- def insert_grobid(self, cur, batch, on_conflict="NOTHING"): # XXX
+ def insert_grobid(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
- grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata)
+ grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata)
VALUES %s
- ON CONFLICT (sha1hex) DO {};
- """.format(on_conflict)
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ grobid_version=EXCLUDED.grobid_version,
+ status_code=EXCLUDED.status_code,
+ status=EXCLUDED.status,
+ fatcat_release=EXCLUDED.fatcat_release,
+ updated=EXCLUDED.updated,
+ metadata=EXCLUDED.metadata
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING 1;"
for r in batch:
if r.get('metadata'):
r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
@@ -95,39 +122,66 @@ class SandcrawlerPostgresClient:
d['status_code'],
d['status'],
d.get('fatcat_release') or None,
+ d.get('updated') or datetime.datetime.now(),
d.get('metadata') or None ,
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch)
+ res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return len(res)
- def insert_ingest_request(self, cur, batch, on_conflict="NOTHING"):
+ def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
- ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request, metadata)
+ ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request)
VALUES %s
- ON CONFLICT ON CONSTRAINT ingest_request_pkey DO {};
- """.format(on_conflict)
+ ON CONFLICT ON CONSTRAINT ingest_request_pkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING 1;"
for r in batch:
- if r.get('metadata'):
- r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
+ extra = dict()
+ for k in ('ext_ids', 'fatcat_release', 'edit_extra'):
+ if r.get(k):
+ extra[k] = r[k]
+ if extra:
+ r['extra'] = json.dumps(extra, sort_keys=True)
batch = [(d['link_source'],
d['link_source_id'],
d['ingest_type'],
d['base_url'],
d.get('ingest_request_source'),
d.get('release_stage') or None,
- d.get('request') or None,
+ d.get('extra') or None,
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch)
+ res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return len(res)
- def insert_ingest_file_result(self, cur, batch, on_conflict="NOTHING"):
+ def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
VALUES %s
- ON CONFLICT DO {};
- """.format(on_conflict)
+ ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=now(),
+ hit=EXCLUDED.hit,
+ status=EXCLUDED.status,
+ terminal_url=EXCLUDED.terminal_url,
+ terminal_dt=EXCLUDED.terminal_dt,
+ terminal_status_code=EXCLUDED.terminal_status_code,
+ terminal_sha1hex=EXCLUDED.terminal_sha1hex
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING 1;"
batch = [(d['ingest_type'],
d['base_url'],
bool(d['hit']),
@@ -138,4 +192,5 @@ class SandcrawlerPostgresClient:
d.get('terminal_sha1hex'),
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch)
+ res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return len(res)