aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/db.py45
-rw-r--r--python/sandcrawler/persist.py24
2 files changed, 45 insertions, 24 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index adf6967..0054e07 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -45,6 +45,14 @@ class SandcrawlerPostgresClient:
def commit(self):
return self.conn.commit()
+ def _inserts_and_updates(self, resp, on_conflict):
+ inserts = len([r for r in resp if r == 0])
+ if on_conflict == "update":
+ updates = len([r for r in resp if r != 0])
+ else:
+ updates = 0
+ return (inserts, updates)
+
def insert_cdx(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
@@ -56,7 +64,7 @@ class SandcrawlerPostgresClient:
sql += " NOTHING"
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
batch = [d for d in batch if d.get('warc_offset')]
if not batch:
@@ -69,8 +77,8 @@ class SandcrawlerPostgresClient:
d['warc_csize'],
d['warc_offset'])
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_file_meta(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -81,17 +89,24 @@ class SandcrawlerPostgresClient:
"""
if on_conflict.lower() == "nothing":
sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ sha256hex=EXCLUDED.sha256hex,
+ md5hex=EXCLUDED.md5hex,
+ size_bytes=EXCLUDED.size_bytes,
+ mimetype=EXCLUDED.mimetype
+ """
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
batch = [(d['sha1hex'],
d['sha256hex'],
d['md5hex'],
int(d['size_bytes']),
d['mimetype'])
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_grobid(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -113,7 +128,7 @@ class SandcrawlerPostgresClient:
"""
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
for r in batch:
if r.get('metadata'):
r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
@@ -126,8 +141,8 @@ class SandcrawlerPostgresClient:
d.get('metadata') or None ,
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -140,7 +155,7 @@ class SandcrawlerPostgresClient:
sql += " NOTHING"
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
for r in batch:
extra = dict()
for k in ('ext_ids', 'fatcat_release', 'edit_extra'):
@@ -157,8 +172,8 @@ class SandcrawlerPostgresClient:
d.get('extra') or None,
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"):
sql = """
@@ -181,7 +196,7 @@ class SandcrawlerPostgresClient:
"""
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
- sql += " RETURNING 1;"
+ sql += " RETURNING xmax;"
batch = [(d['ingest_type'],
d['base_url'],
bool(d['hit']),
@@ -192,5 +207,5 @@ class SandcrawlerPostgresClient:
d.get('terminal_sha1hex'),
)
for d in batch]
- res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
- return len(res)
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 9f8171c..3b9cde9 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -43,7 +43,8 @@ class PersistCdxWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
resp = self.db.insert_cdx(self.cur, batch)
- self.counts['insert-cdx'] += resp
+ self.counts['insert-cdx'] += resp[0]
+ self.counts['update-cdx'] += resp[1]
self.db.commit()
return []
@@ -85,7 +86,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
if not k in raw:
- self.counts['skip-fields'] += 1
+ self.counts['skip-request-fields'] += 1
return None
if raw['ingest_type'] not in ('pdf', 'xml'):
print(raw['ingest_type'])
@@ -120,10 +121,10 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
"""
for k in ('request', 'hit', 'status'):
if not k in raw:
- self.counts['skip-fields'] += 1
+ self.counts['skip-result-fields'] += 1
return None
if not 'base_url' in raw['request']:
- self.counts['skip-fields'] += 1
+ self.counts['skip-result-fields'] += 1
return None
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
@@ -162,21 +163,26 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if requests:
resp = self.db.insert_ingest_request(self.cur, requests)
- self.counts['insert-requests'] += resp
+ self.counts['insert-requests'] += resp[0]
+ self.counts['update-requests'] += resp[1]
if results:
resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update")
- self.counts['insert-results'] += resp
+ self.counts['insert-results'] += resp[0]
+ self.counts['update-results'] += resp[1]
# these schemas match, so can just pass through
- # TODO: need to include warc_path etc in ingest-result
+ # TODO: need to include warc_path etc in ingest worker, when possible
cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
if cdx_batch:
resp = self.db.insert_cdx(self.cur, cdx_batch)
- self.counts['insert-cdx'] += resp
+ self.counts['insert-cdx'] += resp[0]
+ self.counts['update-cdx'] += resp[1]
+
file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
if file_meta_batch:
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
- self.counts['insert-file_meta'] += resp
+ self.counts['insert-file_meta'] += resp[0]
+ self.counts['update-file_meta'] += resp[1]
self.db.commit()
return []