From 33f6744b56a9ca7b01cb4ed7b80bdf70a972ffa8 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 19:22:49 -0800 Subject: implement counts properly for persist workers --- python/sandcrawler/persist.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 07e6c83..86a1c22 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -40,8 +40,8 @@ class PersistCdxWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) - self.db.insert_cdx(self.cur, batch) - self.counts['insert-cdx'] += len(batch) + resp = self.db.insert_cdx(self.cur, batch) + self.counts['insert-cdx'] += resp self.db.commit() return [] @@ -159,22 +159,22 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): requests = [r for r in requests if r] if requests: - self.db.insert_ingest_request(self.cur, requests) - self.counts['insert-requests'] += len(requests) + resp = self.db.insert_ingest_request(self.cur, requests) + self.counts['insert-requests'] += resp if results: - self.db.insert_ingest_file_result(self.cur, results) - self.counts['insert-results'] += len(results) + resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update") + self.counts['insert-results'] += resp # these schemas match, so can just pass through # TODO: need to include warc_path etc in ingest-result cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')] if cdx_batch: - self.db.insert_cdx(self.cur, cdx_batch) - self.counts['insert-cdx'] += len(cdx_batch) + resp = self.db.insert_cdx(self.cur, cdx_batch) + self.counts['insert-cdx'] += resp file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] if file_meta_batch: - self.db.insert_file_meta(self.cur, file_meta_batch) - self.counts['insert-file_meta'] += len(file_meta_batch) + resp = self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file_meta'] += resp self.db.commit() return [] @@ -205,18 +205,22 @@ class PersistGrobidWorker(SandcrawlerWorker): if not metadata: continue for k in ('fatcat_release', 'grobid_version'): - r[k] = metadata.pop(k) + r[k] = metadata.pop(k, None) if r.get('fatcat_release'): r['fatcat_release'] = r['fatcat_release'].replace('release_', '') + if metadata.get('grobid_timestamp'): + r['updated'] = metadata['grobid_timestamp'] r['metadata'] = metadata grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] - self.db.insert_grobid(self.cur, batch) + resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") + self.counts['insert-grobid'] += resp - file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] - self.db.insert_file_meta(self.cur, file_meta_batch) + file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file-meta'] += resp - # TODO: minio, grobid + # TODO: minio self.db.commit() return [] -- cgit v1.2.3