aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/persist.py34
1 files changed, 19 insertions, 15 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 07e6c83..86a1c22 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -40,8 +40,8 @@ class PersistCdxWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
- self.db.insert_cdx(self.cur, batch)
- self.counts['insert-cdx'] += len(batch)
+ resp = self.db.insert_cdx(self.cur, batch)
+ self.counts['insert-cdx'] += resp
self.db.commit()
return []
@@ -159,22 +159,22 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
requests = [r for r in requests if r]
if requests:
- self.db.insert_ingest_request(self.cur, requests)
- self.counts['insert-requests'] += len(requests)
+ resp = self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += resp
if results:
- self.db.insert_ingest_file_result(self.cur, results)
- self.counts['insert-results'] += len(results)
+ resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update")
+ self.counts['insert-results'] += resp
# these schemas match, so can just pass through
# TODO: need to include warc_path etc in ingest-result
cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
if cdx_batch:
- self.db.insert_cdx(self.cur, cdx_batch)
- self.counts['insert-cdx'] += len(cdx_batch)
+ resp = self.db.insert_cdx(self.cur, cdx_batch)
+ self.counts['insert-cdx'] += resp
file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
if file_meta_batch:
- self.db.insert_file_meta(self.cur, file_meta_batch)
- self.counts['insert-file_meta'] += len(file_meta_batch)
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file_meta'] += resp
self.db.commit()
return []
@@ -205,18 +205,22 @@ class PersistGrobidWorker(SandcrawlerWorker):
if not metadata:
continue
for k in ('fatcat_release', 'grobid_version'):
- r[k] = metadata.pop(k)
+ r[k] = metadata.pop(k, None)
if r.get('fatcat_release'):
r['fatcat_release'] = r['fatcat_release'].replace('release_', '')
+ if metadata.get('grobid_timestamp'):
+ r['updated'] = metadata['grobid_timestamp']
r['metadata'] = metadata
grobid_batch = [r['grobid'] for r in batch if r.get('grobid')]
- self.db.insert_grobid(self.cur, batch)
+ resp = self.db.insert_grobid(self.cur, batch, on_conflict="update")
+ self.counts['insert-grobid'] += resp
- file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
- self.db.insert_file_meta(self.cur, file_meta_batch)
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file-meta'] += resp
- # TODO: minio, grobid
+ # TODO: minio
self.db.commit()
return []