aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-15 13:54:29 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-15 13:54:29 -0800
commit0c4b686cf6c536087683d7982558bec3c5696c7f (patch)
treeb689d465c8afb46dde94dc91b2c71fd5347e6ecf /python/sandcrawler/persist.py
parentd06fd45e3c86cb080ad7724f3fc7575750a9cd69 (diff)
downloadsandcrawler-0c4b686cf6c536087683d7982558bec3c5696c7f.tar.gz
sandcrawler-0c4b686cf6c536087683d7982558bec3c5696c7f.zip
persist worker: implement updated ingest result semantics
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py27
1 files changed, 16 insertions, 11 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 7cb4f8d..2463afa 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -43,7 +43,11 @@ class PersistCdxWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
- resp = self.db.insert_cdx(self.cur, batch)
+ # filter to full CDX lines, no liveweb
+ cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ resp = self.db.insert_cdx(self.cur, cdx_batch)
+ if len(cdx_batch) < len(batch):
+ self.counts['skip'] += len(batch) - len(cdx_batch)
self.counts['insert-cdx'] += resp[0]
self.counts['update-cdx'] += resp[1]
self.db.commit()
@@ -144,14 +148,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
}
terminal = raw.get('terminal')
if terminal:
- result['terminal_url'] = terminal['url']
- if terminal.get('status_code') == None and terminal.get('http_status'):
- terminal['status_code'] = terminal['http_status']
- result['terminal_status_code'] = int(terminal['status_code'])
- if raw.get('file_meta'):
- result['terminal_sha1hex'] = raw['file_meta']['sha1hex']
- if raw.get('cdx') and raw['cdx']['url'] == terminal['url']:
- result['terminal_dt'] = raw['cdx']['datetime']
+ result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
+ result['terminal_dt'] = terminal.get('terminal_dt')
+ result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code')
+ if result['terminal_status_code']:
+ result['terminal_status_code'] = int(result['terminal_status_code'])
+ result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
return result
def push_batch(self, batch):
@@ -176,8 +178,11 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['update-results'] += resp[1]
# these schemas match, so can just pass through
- # TODO: need to include warc_path etc in ingest worker, when possible
- cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
+ cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
+ revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')]
+ cdx_batch.extend(revisit_cdx_batch)
+ # filter to full CDX lines, with full warc_paths (not liveweb)
+ cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
if cdx_batch:
resp = self.db.insert_cdx(self.cur, cdx_batch)
self.counts['insert-cdx'] += resp[0]