From beba257030d84af2f80c09ec695a35a733a2322d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 21:18:35 -0800 Subject: db: fancy insert/update separation using postgres xmax --- python/sandcrawler/persist.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 9f8171c..3b9cde9 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -43,7 +43,8 @@ class PersistCdxWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) resp = self.db.insert_cdx(self.cur, batch) - self.counts['insert-cdx'] += resp + self.counts['insert-cdx'] += resp[0] + self.counts['update-cdx'] += resp[1] self.db.commit() return [] @@ -85,7 +86,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): if not k in raw: - self.counts['skip-fields'] += 1 + self.counts['skip-request-fields'] += 1 return None if raw['ingest_type'] not in ('pdf', 'xml'): print(raw['ingest_type']) @@ -120,10 +121,10 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): """ for k in ('request', 'hit', 'status'): if not k in raw: - self.counts['skip-fields'] += 1 + self.counts['skip-result-fields'] += 1 return None if not 'base_url' in raw['request']: - self.counts['skip-fields'] += 1 + self.counts['skip-result-fields'] += 1 return None ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': @@ -162,21 +163,26 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if requests: resp = self.db.insert_ingest_request(self.cur, requests) - self.counts['insert-requests'] += resp + self.counts['insert-requests'] += resp[0] + self.counts['update-requests'] += resp[1] if results: resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update") - self.counts['insert-results'] += resp + self.counts['insert-results'] += resp[0] + self.counts['update-results'] += resp[1] # these schemas match, so can just pass through - # TODO: need to include warc_path etc in ingest-result + # TODO: need to include warc_path etc in ingest worker, when possible cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')] if cdx_batch: resp = self.db.insert_cdx(self.cur, cdx_batch) - self.counts['insert-cdx'] += resp + self.counts['insert-cdx'] += resp[0] + self.counts['update-cdx'] += resp[1] + file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] if file_meta_batch: resp = self.db.insert_file_meta(self.cur, file_meta_batch) - self.counts['insert-file_meta'] += resp + self.counts['insert-file_meta'] += resp[0] + self.counts['update-file_meta'] += resp[1] self.db.commit() return [] -- cgit v1.2.3