aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-12-26 21:18:35 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:58 -0800
commitbeba257030d84af2f80c09ec695a35a733a2322d (patch)
tree279cc70e9e2ab7bcbaf53c7af92a1b2ebb1140db /python/sandcrawler/persist.py
parent6a5a0b090d7f303f3332759d63ffd0ac77cdd28c (diff)
downloadsandcrawler-beba257030d84af2f80c09ec695a35a733a2322d.tar.gz
sandcrawler-beba257030d84af2f80c09ec695a35a733a2322d.zip
db: fancy insert/update separation using postgres xmax
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py24
1 files changed, 15 insertions, 9 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 9f8171c..3b9cde9 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -43,7 +43,8 @@ class PersistCdxWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
resp = self.db.insert_cdx(self.cur, batch)
- self.counts['insert-cdx'] += resp
+ self.counts['insert-cdx'] += resp[0]
+ self.counts['update-cdx'] += resp[1]
self.db.commit()
return []
@@ -85,7 +86,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
if not k in raw:
- self.counts['skip-fields'] += 1
+ self.counts['skip-request-fields'] += 1
return None
if raw['ingest_type'] not in ('pdf', 'xml'):
print(raw['ingest_type'])
@@ -120,10 +121,10 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
"""
for k in ('request', 'hit', 'status'):
if not k in raw:
- self.counts['skip-fields'] += 1
+ self.counts['skip-result-fields'] += 1
return None
if not 'base_url' in raw['request']:
- self.counts['skip-fields'] += 1
+ self.counts['skip-result-fields'] += 1
return None
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
@@ -162,21 +163,26 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if requests:
resp = self.db.insert_ingest_request(self.cur, requests)
- self.counts['insert-requests'] += resp
+ self.counts['insert-requests'] += resp[0]
+ self.counts['update-requests'] += resp[1]
if results:
resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update")
- self.counts['insert-results'] += resp
+ self.counts['insert-results'] += resp[0]
+ self.counts['update-results'] += resp[1]
# these schemas match, so can just pass through
- # TODO: need to include warc_path etc in ingest-result
+ # TODO: need to include warc_path etc in ingest worker, when possible
cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
if cdx_batch:
resp = self.db.insert_cdx(self.cur, cdx_batch)
- self.counts['insert-cdx'] += resp
+ self.counts['insert-cdx'] += resp[0]
+ self.counts['update-cdx'] += resp[1]
+
file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
if file_meta_batch:
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
- self.counts['insert-file_meta'] += resp
+ self.counts['insert-file_meta'] += resp[0]
+ self.counts['update-file_meta'] += resp[1]
self.db.commit()
return []